adj operator result
This commit is contained in:
parent
58853d342d
commit
b1a4ea9472
|
@ -128,9 +128,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo);
|
||||
void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
|
||||
|
||||
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
||||
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||
|
|
|
@ -317,7 +317,7 @@ _end:
|
|||
}
|
||||
|
||||
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
|
@ -355,7 +355,7 @@ static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doFillNext(pOperator, &pRes);
|
||||
int32_t code = doFillNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -384,8 +384,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
|
||||
// STimeWindow w = {0};
|
||||
// getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
|
||||
pInfo->pFillInfo = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id, pTaskInfo);
|
||||
pInfo->pFillInfo = NULL;
|
||||
taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
||||
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
pInfo->win.skey = win.skey;
|
||||
|
|
|
@ -357,12 +357,13 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
destroySBuffInfo(pAggSup, &buffInfo);
|
||||
}
|
||||
|
||||
static SSDataBlock* buildCountResult(SOperatorInfo* pOperator) {
|
||||
static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
@ -370,15 +371,18 @@ static SSDataBlock* buildCountResult(SOperatorInfo* pOperator) {
|
|||
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
(*ppRes) = pInfo->pDelRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
(*ppRes) = pBInfo->pRes;
|
||||
return code;
|
||||
}
|
||||
return NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||
|
@ -425,6 +429,7 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (!pInfo) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -465,7 +470,7 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -475,6 +480,7 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
int32_t lino = 0;
|
||||
void* pBuf = NULL;
|
||||
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||
int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
|
||||
pBuf = taosMemoryCalloc(1, len);
|
||||
|
@ -492,7 +498,7 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
_end:
|
||||
taosMemoryFreeClear(pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -617,7 +623,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* opRes = buildCountResult(pOperator);
|
||||
SSDataBlock* opRes = NULL;
|
||||
code = buildCountResult(pOperator, &opRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (opRes) {
|
||||
(*ppRes) = opRes;
|
||||
return code;
|
||||
|
@ -715,7 +723,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
SSDataBlock* opRes = buildCountResult(pOperator);
|
||||
SSDataBlock* opRes = NULL;
|
||||
code = buildCountResult(pOperator, &opRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (opRes) {
|
||||
(*ppRes) = opRes;
|
||||
return code;
|
||||
|
@ -724,7 +734,7 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -733,7 +743,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamCountAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamCountAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -741,6 +751,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t resSize = sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
QUERY_CHECK_NULL(pBuff, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
@ -758,7 +769,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -766,6 +777,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
int32_t size = 0;
|
||||
void* pBuf = NULL;
|
||||
|
@ -787,7 +799,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) {
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -896,6 +908,6 @@ _error:
|
|||
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -285,7 +285,7 @@ static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pC
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -444,7 +444,7 @@ _end:
|
|||
colDataDestroy(pColEnd);
|
||||
taosMemoryFree(pColEnd);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -490,6 +490,7 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (!pInfo) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -532,7 +533,7 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -555,7 +556,8 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
|
||||
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -563,15 +565,18 @@ static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
|
|||
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
(*ppRes) = pInfo->pDelRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
(*ppRes) = pBInfo->pRes;
|
||||
return code;
|
||||
}
|
||||
return NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
|
@ -588,7 +593,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
qDebug("===stream=== stream event agg");
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* resBlock = buildEventResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildEventResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -700,7 +707,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SSDataBlock* resBlock = buildEventResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildEventResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -709,7 +718,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -718,7 +727,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamEventAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamEventAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -747,6 +756,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t lino = 0;
|
||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
resetWinRange(&pAggSup->winRange);
|
||||
|
||||
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
||||
|
@ -830,7 +840,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -951,6 +961,6 @@ _error:
|
|||
destroyStreamEventOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -742,7 +742,7 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -751,6 +751,7 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint
|
|||
int32_t lino = 0;
|
||||
SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI;
|
||||
void* pState = pOp->pTaskInfo->streamInfo.pState;
|
||||
SExecTaskInfo* pTaskInfo = pOp->pTaskInfo;
|
||||
SSDataBlock* pBlock = delRes;
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
|
@ -793,7 +794,7 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -804,6 +805,7 @@ static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY
|
|||
int32_t lino = 0;
|
||||
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamFillSupporter* pFillSup = pInfo->pFillSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (hasPrevWindow(pFillSup)) {
|
||||
TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
|
||||
code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
|
||||
|
@ -819,7 +821,7 @@ static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -829,6 +831,7 @@ static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, T
|
|||
int32_t lino = 0;
|
||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
|
||||
setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
|
||||
SWinKey key = {.ts = startTs, .groupId = groupId};
|
||||
|
@ -851,7 +854,7 @@ static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, T
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -910,6 +913,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
|
|||
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
|
||||
SSDataBlock* pBlock = pInfo->pSrcDelBlock;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
|
||||
|
@ -966,7 +970,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -984,6 +988,7 @@ static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBl
|
|||
int32_t lino = 0;
|
||||
SStreamFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
blockDataCleanup(pDstBlock);
|
||||
code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
|
||||
|
@ -1007,7 +1012,7 @@ static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBl
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1140,7 +1145,7 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
resetStreamFillInfo(pInfo);
|
||||
|
@ -1150,7 +1155,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamFillNext(pOperator, &pRes);
|
||||
int32_t code = doStreamFillNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -1423,7 +1428,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
|||
|
||||
_error:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
destroyStreamFillOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
|
|
@ -236,6 +236,7 @@ static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, S
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
|
@ -301,7 +302,7 @@ static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, S
|
|||
}
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -335,6 +336,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
|
|||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pIte, NULL);
|
||||
|
@ -376,7 +378,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -517,9 +519,10 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo
|
|||
}
|
||||
|
||||
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
|
||||
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
return initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
|
||||
|
@ -542,7 +545,7 @@ int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStream
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -584,7 +587,7 @@ int32_t compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, i
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -946,7 +949,7 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1149,7 +1152,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
|||
}
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1332,6 +1335,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (!pInfo) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1401,7 +1405,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1445,15 +1449,16 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
||||
static int32_t buildIntervalResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
uint16_t opType = pOperator->operatorType;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
uint16_t opType = pOperator->operatorType;
|
||||
|
||||
// check if query task is closed or not
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
return NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
|
@ -1461,7 +1466,8 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pPullDataRes;
|
||||
(*ppRes) = pInfo->pPullDataRes;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1469,16 +1475,19 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
(*ppRes) = pInfo->pDelRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
|
||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||
printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->binfo.pRes;
|
||||
(*ppRes) = pInfo->binfo.pRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) {
|
||||
|
@ -1520,7 +1529,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -1549,7 +1560,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
return code;
|
||||
} else {
|
||||
if (!IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -1712,7 +1725,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -1728,7 +1743,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -1737,7 +1752,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -1799,6 +1814,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL &&
|
||||
pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
int32_t size = 0;
|
||||
|
@ -1820,7 +1836,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2036,8 +2052,9 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
|
|||
|
||||
int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
|
||||
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->tsColIndex = tsColIndex;
|
||||
|
@ -2062,7 +2079,7 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup,
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2328,7 +2345,7 @@ int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo*
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2404,7 +2421,7 @@ int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWi
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2442,7 +2459,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2574,7 +2591,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2631,9 +2648,10 @@ inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
|
|||
}
|
||||
|
||||
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI;
|
||||
SExecTaskInfo* pTaskInfo = pOp->pTaskInfo;
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
int32_t size = tSimpleHashGetSize(pStDeleted);
|
||||
|
@ -2699,7 +2717,7 @@ _end:
|
|||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2786,7 +2804,7 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray,
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2974,7 +2992,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -3003,11 +3021,12 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo*
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) {
|
||||
static int32_t buildSessionResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
@ -3015,15 +3034,18 @@ static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) {
|
|||
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
(*ppRes) = pInfo->pDelRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
(*ppRes) = pBInfo->pRes;
|
||||
return code;
|
||||
}
|
||||
return NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
|
||||
|
@ -3143,6 +3165,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (!pInfo) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -3200,7 +3223,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -3268,7 +3291,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* opRes = buildSessionResult(pOperator);
|
||||
SSDataBlock* opRes = NULL;
|
||||
code = buildSessionResult(pOperator, &opRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (opRes) {
|
||||
(*ppRes) = opRes;
|
||||
return code;
|
||||
|
@ -3419,7 +3444,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SSDataBlock* opRes = buildSessionResult(pOperator);
|
||||
SSDataBlock* opRes = NULL;
|
||||
code = buildSessionResult(pOperator, &opRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (opRes) {
|
||||
(*ppRes) = opRes;
|
||||
return code;
|
||||
|
@ -3428,7 +3455,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -3437,7 +3464,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamSessionAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamSessionAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -3506,6 +3533,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t lino = 0;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
resetWinRange(&pAggSup->winRange);
|
||||
|
||||
SResultWindowInfo winInfo = {0};
|
||||
|
@ -3542,7 +3570,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3551,6 +3579,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t lino = 0;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
resetWinRange(&pAggSup->winRange);
|
||||
|
||||
int32_t size = 0;
|
||||
|
@ -3614,7 +3643,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3736,7 +3765,7 @@ _error:
|
|||
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -3792,7 +3821,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
}
|
||||
|
||||
{
|
||||
SSDataBlock* opRes = buildSessionResult(pOperator);
|
||||
SSDataBlock* opRes = NULL;
|
||||
code = buildSessionResult(pOperator, &opRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (opRes) {
|
||||
(*ppRes) = opRes;
|
||||
return code;
|
||||
|
@ -3889,7 +3920,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SSDataBlock* opRes = buildSessionResult(pOperator);
|
||||
SSDataBlock* opRes = NULL;
|
||||
code = buildSessionResult(pOperator, &opRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (opRes) {
|
||||
(*ppRes) = opRes;
|
||||
return code;
|
||||
|
@ -3898,7 +3931,7 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
clearFunctionContext(&pOperator->exprSupp);
|
||||
|
@ -3911,7 +3944,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamSessionSemiAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamSessionSemiAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -3982,7 +4015,7 @@ _error:
|
|||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
@ -4326,7 +4359,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4383,6 +4416,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (!pInfo) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -4440,7 +4474,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -4463,7 +4497,8 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
|
||||
static int32_t buildStateResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -4471,15 +4506,18 @@ static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
|
|||
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pInfo->pDelRes;
|
||||
(*ppRes) = pInfo->pDelRes;
|
||||
return code;
|
||||
}
|
||||
|
||||
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||
return pBInfo->pRes;
|
||||
(*ppRes) = pBInfo->pRes;
|
||||
return code;
|
||||
}
|
||||
return NULL;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
|
@ -4496,7 +4534,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
qDebug("===stream=== stream state agg");
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* resBlock = buildStateResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildStateResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -4597,7 +4637,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SSDataBlock* resBlock = buildStateResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildStateResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -4606,7 +4648,7 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -4615,7 +4657,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamStateAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamStateAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -4652,6 +4694,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t lino = 0;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
resetWinRange(&pAggSup->winRange);
|
||||
|
||||
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
||||
|
@ -4728,7 +4771,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4837,7 +4880,7 @@ _error:
|
|||
destroyStreamStateOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -4869,7 +4912,9 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
}
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -5000,13 +5045,15 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
|
||||
(*ppRes) = buildIntervalResult(pOperator);
|
||||
code = buildIntervalResult(pOperator, ppRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
return code;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -5015,7 +5062,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamIntervalAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamIntervalAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
@ -5247,7 +5294,7 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5314,7 +5361,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -5327,7 +5376,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
} else {
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -5490,7 +5541,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SSDataBlock* resBlock = buildIntervalResult(pOperator);
|
||||
SSDataBlock* resBlock = NULL;
|
||||
code = buildIntervalResult(pOperator, &resBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (resBlock != NULL) {
|
||||
(*ppRes) = resBlock;
|
||||
return code;
|
||||
|
@ -5511,7 +5564,7 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
|
@ -5519,7 +5572,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doStreamMidIntervalAggNext(pOperator, &pRes);
|
||||
int32_t code = doStreamMidIntervalAggNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -1984,7 +1984,11 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
|||
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||
char* buf1 = taosMemoryCalloc(1, contLen);
|
||||
(void)tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||
int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||
if (tempRes < 0) {
|
||||
code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// send the fetch remote task result reques
|
||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
|
@ -2589,7 +2593,11 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||
QUERY_CHECK_NULL(p, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
(void)tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||
int32_t tempRes = tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||
if (tempRes < 0) {
|
||||
code = terrno;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
varDataSetLen(p, len);
|
||||
|
||||
code = colDataSetVal(pColInfo, 0, p, false);
|
||||
|
@ -2619,7 +2627,7 @@ _end:
|
|||
|
||||
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = doBlockInfoScanNext(pOperator, &pRes);
|
||||
int32_t code = doBlockInfoScanNext(pOperator, &pRes);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -47,9 +47,9 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo,
|
|||
}
|
||||
|
||||
SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx);
|
||||
int32_t code = doSetVal(pDstColInfo, rowIndex, pKey);
|
||||
int32_t code = doSetVal(pDstColInfo, rowIndex, pKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
@ -511,13 +511,14 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
|||
return pFillInfo->numOfRows - pFillInfo->index;
|
||||
}
|
||||
|
||||
struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||
int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo) {
|
||||
void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
return NULL;
|
||||
(*ppFillInfo) = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
|
||||
|
@ -562,7 +563,7 @@ _end:
|
|||
terrno = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return pFillInfo;
|
||||
(*ppFillInfo) = pFillInfo;
|
||||
}
|
||||
|
||||
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
|
||||
|
|
Loading…
Reference in New Issue