commit
3d93c3a45d
|
@ -128,9 +128,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
|
||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
|
||||
|
||||
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
||||
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||
|
|
|
@ -405,8 +405,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
// STimeWindow w = {0};
|
||||
// getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
|
||||
pInfo->pFillInfo = NULL;
|
||||
taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
||||
int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
pInfo->win.skey = win.skey;
|
||||
|
|
|
@ -536,9 +536,10 @@ static int32_t createTableCacheVal(const SMetaReader* pMetaReader, STableCachedV
|
|||
int32_t lino = 0;
|
||||
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
|
||||
QUERY_CHECK_NULL(pVal, code, lino, _end, terrno);
|
||||
|
||||
pVal->pTags = NULL;
|
||||
pVal->pName = taosStrdup(pMetaReader->me.name);
|
||||
QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno);
|
||||
pVal->pTags = NULL;
|
||||
|
||||
// only child table has tag value
|
||||
if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
|
||||
|
|
|
@ -523,14 +523,14 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
|||
return pFillInfo->numOfRows - pFillInfo->index;
|
||||
}
|
||||
|
||||
void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
|
||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
(*ppFillInfo) = NULL;
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
|
||||
|
@ -572,10 +572,9 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pFillInfo->next.pRowVal);
|
||||
taosArrayDestroy(pFillInfo->prev.pRowVal);
|
||||
terrno = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppFillInfo) = pFillInfo;
|
||||
return code;
|
||||
}
|
||||
|
||||
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
|
||||
|
|
|
@ -1230,7 +1230,7 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
bool* pRes) {
|
||||
// the primary timestamp column
|
||||
bool needed = false;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1298,10 +1298,9 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
|||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return needed;
|
||||
*pRes = needed;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
|
@ -1390,7 +1389,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
|
||||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, pTaskInfo);
|
||||
|
||||
pInfo->timeWindowInterpo = false;
|
||||
code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
if (pInfo->timeWindowInterpo) {
|
||||
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
|
||||
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
|
||||
|
@ -2085,7 +2087,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
|||
code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, pTaskInfo);
|
||||
iaInfo->timeWindowInterpo = false;
|
||||
code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
if (iaInfo->timeWindowInterpo) {
|
||||
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
|
||||
}
|
||||
|
@ -2416,7 +2420,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, pTaskInfo);
|
||||
pIntervalInfo->timeWindowInterpo = false;
|
||||
code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
if (pIntervalInfo->timeWindowInterpo) {
|
||||
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
|
||||
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
|
||||
|
|
Loading…
Reference in New Issue