Merge remote-tracking branch 'origin/3.0' into feature/dnode

This commit is contained in:
Shengliang Guan 2022-05-04 12:22:00 +08:00
commit 63fcbbf42a
20 changed files with 2019 additions and 2060 deletions

View File

@ -186,6 +186,7 @@ struct STsdbFS {
#define REPO_ID(r) TD_VID((r)->pVnode)
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
#define REPO_LEVEL(r) ((r)->level)
#define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pVnode->pMeta)
#define REPO_TFS(r) ((r)->pVnode->pTfs)

View File

@ -15,6 +15,8 @@
#include "tsdb.h"
extern const char *TSDB_LEVEL_DNAME[];
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
static const char *tsdbTxnFname[] = {"current.t", "current"};
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
@ -35,12 +37,12 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
// static int tsdbProcessExpiredFS(STsdb *pRepo);
// static int tsdbCreateMeta(STsdb *pRepo);
static void tsdbGetRootDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
static void tsdbGetRootDir(int repoid, int8_t level, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, TSDB_LEVEL_DNAME[level]);
}
static void tsdbGetDataDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
static void tsdbGetDataDir(int repoid, int8_t level, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, TSDB_LEVEL_DNAME[level]);
}
// For backward compatibility
@ -588,8 +590,8 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
}
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
tsdbTxnFname[ftype]);
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/%s/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
TSDB_LEVEL_DNAME[REPO_LEVEL(pRepo)], tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
@ -719,7 +721,7 @@ static int tsdbScanRootDir(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
tsdbGetRootDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), rootDir);
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
@ -753,7 +755,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
tsdbGetDataDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), dataDir);
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
@ -801,7 +803,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
regex_t regex;
STsdbFS *pfs = REPO_FS(pRepo);
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
tsdbGetDataDir(REPO_ID(pRepo), REPO_LEVEL(pRepo), dataDir);
// Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED);

View File

@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"rsma", // TSDB_FILE_RSMA
};
static const char *TSDB_LEVEL_DNAME[] = {
const char *TSDB_LEVEL_DNAME[] = {
"tsdb",
"rsma1",
"rsma2",

View File

@ -98,46 +98,46 @@ typedef struct SIOCostSummary {
} SIOCostSummary;
typedef struct SBlockLoadSuppInfo {
SColumnDataAgg *pstatis;
SColumnDataAgg **plist;
SArray *defaultLoadColumn; // default load column
int32_t *slotIds; // colId to slotId
SColumnDataAgg* pstatis;
SColumnDataAgg** plist;
SArray* defaultLoadColumn; // default load column
int32_t* slotIds; // colId to slotId
} SBlockLoadSuppInfo;
typedef struct STsdbReadHandle {
STsdb* pTsdb;
SQueryFilePos cur; // current position
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t numOfBlocks;
SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart;
int32_t outputCapacity;
int32_t realNumOfRows;
SArray* pTableCheckInfo; // SArray<STableCheckInfo>
int32_t activeIndex;
bool checkFiles; // check file stage
int8_t cachelastrow; // check if last row cached
bool loadExternalRow; // load time window external data rows
bool currentLoadExternalRows; // current load external rows
int32_t loadType; // block load type
char* idStr; // query info handle, for debug purpose
STsdb* pTsdb;
SQueryFilePos cur; // current position
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t numOfBlocks;
SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart;
int32_t outputCapacity;
int32_t realNumOfRows;
SArray* pTableCheckInfo; // SArray<STableCheckInfo>
int32_t activeIndex;
bool checkFiles; // check file stage
int8_t cachelastrow; // check if last row cached
bool loadExternalRow; // load time window external data rows
bool currentLoadExternalRows; // current load external rows
int32_t loadType; // block load type
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SDFileSet* pFileGroup;
SFSIter fileIter;
SReadH rhelper;
STableBlockInfo* pDataBlockInfo;
SDataCols* pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
SDataCols* pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
SBlockLoadSuppInfo suppInfo;
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SIOCostSummary cost;
STSchema* pSchema;
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SIOCostSummary cost;
STSchema* pSchema;
} STsdbReadHandle;
typedef struct STableGroupSupporter {
@ -155,7 +155,6 @@ static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbReadHandle* pTsdbReadHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
@ -164,6 +163,8 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
pBlockLoadInfo->uid = 0;
@ -350,12 +351,38 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
}
}
#if 0
int nQUERY = 0;
#endif
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
if (vnodeIsRollup(pVnode)) {
// for(int32_t i=0; i< TSDB_; ) {
// }
int level = 0;
#if 1
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + i;
if (pRetention->keep <= 0 || (now - pRetention->keep) >= winSKey) {
break;
}
}
#endif
#if 0
++nQUERY;
if(nQUERY%3 == 0) {
level = 2;
} else if(nQUERY%2 == 0) {
level = 1;
} else {
level = 0;
}
#endif
if (level == TSDB_RETENTION_L0) {
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
return VND_RSMA1(pVnode);
} else {
return VND_RSMA2(pVnode);
}
}
return pVnode->pTsdb;
}
@ -420,8 +447,10 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
}
pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
pReadHandle->suppInfo.slotIds = taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
pReadHandle->suppInfo.slotIds =
taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
pReadHandle->suppInfo.plist =
taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
}
pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
@ -444,7 +473,6 @@ _end:
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) {
return NULL;
@ -462,16 +490,16 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
return NULL;
}
STableCheckInfo *pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
STSchema* pSchema = pTsdbReadHandle->pSchema;
int32_t i = 0, j = 0;
while(i < numOfCols && j < pSchema->numOfCols) {
while (i < numOfCols && j < pSchema->numOfCols) {
if (ids[i] == pSchema->columns[j].colId) {
pTsdbReadHandle->suppInfo.slotIds[i] = j;
i++;
@ -1137,7 +1165,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
int32_t slotIndex) {
int64_t st = taosGetTimestampUs();
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@ -1308,6 +1336,8 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
return code;
}
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
bool* exists) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;

View File

@ -229,10 +229,28 @@ int vnodeCommit(SVnode *pVnode) {
ASSERT(0);
return -1;
}
if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0);
return -1;
if(vnodeIsRollup(pVnode)) {
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
ASSERT(0);
return -1;
}
} else {
if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0);
return -1;
}
}
if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0);
return -1;

View File

@ -73,7 +73,7 @@ typedef struct SResKeyPos {
} SResKeyPos;
typedef struct SResultRowInfo {
SResultRowPosition *pPosition;
SResultRowPosition *pPosition; // todo remove this
int32_t size; // number of result set
int32_t capacity; // max capacity
SResultRowPosition cur;

View File

@ -20,6 +20,12 @@
extern "C" {
#endif
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
#ifdef __cplusplus
}

View File

@ -40,6 +40,7 @@ extern "C" {
#include "tpagedbuf.h"
#include "vnode.h"
#include "executorInt.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
@ -196,7 +197,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
@ -424,7 +425,7 @@ typedef struct STimeWindowSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct STableIntervalOperatorInfo {
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
@ -439,7 +440,7 @@ typedef struct STableIntervalOperatorInfo {
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info
} STableIntervalOperatorInfo;
} SIntervalAggOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
@ -478,16 +479,8 @@ typedef struct SFillOperatorInfo {
void** p;
SSDataBlock* existNewGroupBlock;
bool multigroupResult;
SInterval intervalInfo;
} SFillOperatorInfo;
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols; // group by columns, SArray<SColumn>
@ -540,6 +533,7 @@ typedef struct SSessionAggOperatorInfo {
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;
@ -557,6 +551,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t colIndex; // start row index
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
// bool reptScan;
} SStateWindowOperatorInfo;
@ -613,6 +608,9 @@ typedef struct SJoinOperatorInfo {
SNode *pOnCondition;
} SJoinOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
@ -623,8 +621,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx);
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
@ -642,6 +640,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowCellInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
@ -663,10 +671,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
@ -676,14 +686,15 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
@ -704,7 +715,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);

View File

@ -27,13 +27,12 @@ extern "C" {
struct SSDataBlock;
typedef struct SFillColInfo {
// STColumn col; // column info
SResSchema col;
int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
int32_t offset;
union {int64_t i; double d;} val;
SExprInfo *pExpr;
// SResSchema schema;
// int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
SVariant fillVal;
} SFillColInfo;
typedef struct {
@ -56,9 +55,10 @@ typedef struct SFillInfo {
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data
char** pData; // original result data block involved in filling data
SArray *prev;
SArray *next;
SSDataBlock *pSrcBlock;
int32_t alloc; // data buffer size in rows
SFillColInfo* pFillCol; // column info for fill operations
@ -72,7 +72,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
@ -80,7 +80,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
struct SFillColInfo* pCol, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);

View File

@ -154,14 +154,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
bool newgroup = false;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup);
int64_t st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);

File diff suppressed because it is too large Load Diff

View File

@ -256,7 +256,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
@ -537,11 +537,12 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->pageIndex += 1;
blockDataUpdateTsWindow(pInfo->binfo.pRes);
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
return pInfo->binfo.pRes;
}
static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
@ -558,7 +559,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;

View File

@ -257,11 +257,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
*newgroup = false;
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
@ -289,7 +287,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -298,10 +296,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
*newgroup = false;
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
return p;
}
@ -334,7 +330,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
while (pTableScanInfo->current < total) {
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) {
return p;
}
@ -421,13 +417,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
return pOperator;
}
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
STableScanInfo* pTableScanInfo = pOperator->info;
*newgroup = false;
STableBlockDistInfo tableBlockDist = {0};
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
@ -514,7 +509,7 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
@ -859,7 +854,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
return pBlock;
}
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
@ -1191,7 +1186,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
return pOperator;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {

View File

@ -13,20 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "function.h"
#include "os.h"
#include "querynodes.h"
#include "taosdef.h"
#include "tmsg.h"
#include "ttypes.h"
#include "tfill.h"
#include "function.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
#include "function.h"
#include "tdatablock.h"
#include "executorInt.h"
#include "querynodes.h"
#include "tfill.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
@ -37,168 +38,208 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
continue;
}
char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows);
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
char* val1 = elePtrAt(data[j], pSchema->bytes, genRows);
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
// assert (pTag->col.colId == pCol->col.colId);
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
}
}
static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) {
static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column.
for (int32_t i = 1; i < numOfCol; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex);
setNull(output, pCol->col.type, pCol->col.bytes);
for (int32_t i = 1; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(p, rowIndex);
}
}
static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) {
char* prev = pFillInfo->prevValues;
char* next = pFillInfo->nextValues;
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock *pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) {
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
char* val = elePtrAt(data[0], TSDB_KEYSIZE, index);
SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
char* val = colDataGetData(pCol0, index);
*(TSKEY*) val = pFillInfo->currentKey;
// set the other values
if (pFillInfo->type == TSDB_FILL_PREV) {
char* p = FILL_IS_ASC_FILL(pFillInfo) ? prev : next;
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
if (p != NULL) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
char* output = elePtrAt(data[i], pCol->col.bytes, index);
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
}
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
char* p = FILL_IS_ASC_FILL(pFillInfo)? next : prev;
if (p != NULL) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
char* output = elePtrAt(data[i], pCol->col.bytes, index);
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
}
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (prev != NULL && !outOfBound) {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
int16_t type = pCol->col.type;
int16_t bytes = pCol->col.bytes;
char *val1 = elePtrAt(data[i], pCol->col.bytes, index);
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, pCol->col.type, bytes);
continue;
}
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
}
} else {
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
}
} else { // fill the default value */
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) {
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
SGroupKeys* pKey = taosArrayGet(p, i);
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
doSetVal(pDstColInfoData, index, pKey);
}
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SGroupKeys* pKey = taosArrayGet(p, i);
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
doSetVal(pDstColInfoData, index, pKey);
}
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (outOfBound) {
setNullRow(pBlock, pFillInfo->numOfCols, index);
} else {
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
int16_t type = pCol->pExpr->base.resSchema.type;
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataAppendNULL(pDstCol, index);
continue;
}
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0);
int64_t prevTs = *(int64_t*)pKey1->pData;
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
char* data = colDataGetData(pSrcCol, pFillInfo->index);
point1 = (SPoint){.key = prevTs, .val = pKey->pData};
point2 = (SPoint){.key = ts, .val = data};
int64_t out = 0;
point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
colDataAppend(pDstCol, index, (const char*)&out, false);
}
}
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
setNullRow(pBlock, pFillInfo->numOfCols, index);
} else { // fill with user specified value for each column
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue;
}
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
}
}
}
setTagsValue(pFillInfo, data, index);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit,
pFillInfo->interval.precision);
// setTagsValue(pFillInfo, data, index);
SInterval* pInterval = &pFillInfo->interval;
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pFillInfo->numOfCurrent++;
}
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
if (*next != NULL) {
void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) {
if (pKey->isNull) {
colDataAppendNULL(pDstCol, rowIndex);
} else {
colDataAppend(pDstCol, rowIndex, pKey->pData, false);
}
}
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
if (taosArrayGetSize(pFillInfo->next) > 0) {
return;
}
*next = taosMemoryCalloc(1, pFillInfo->rowSize);
for (int i = 1; i < pFillInfo->numOfCols; i++) {
for (int i = 0; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes);
SGroupKeys key = {0};
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
key.pData = taosMemoryMalloc(pSchema->bytes);
key.isNull = true;
key.bytes = pSchema->bytes;
key.type = pSchema->type;
taosArrayPush(pFillInfo->next, &key);
key.pData = taosMemoryMalloc(pSchema->bytes);
taosArrayPush(pFillInfo->prev, &key);
}
}
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* buf) {
int32_t rowIndex = pFillInfo->index;
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]);
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
}
}
static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) {
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
pFillInfo->numOfCurrent = 0;
char** srcData = pFillInfo->pData;
char** prev = &pFillInfo->prevValues;
char** next = &pFillInfo->nextValues;
// todo make sure the first column is always the primary timestamp column?
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
if (FILL_IS_ASC_FILL(pFillInfo)) {
assert(pFillInfo->currentKey >= pFillInfo->start);
} else {
assert(pFillInfo->currentKey <= pFillInfo->start);
}
#if 0
ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
#endif
while (pFillInfo->numOfCurrent < outputRows) {
int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->index];
int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
// set the next value for interpolation
if ((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
initBeforeAfterDataBuf(pFillInfo, next);
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
}
if (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
pFillInfo->numOfCurrent < outputRows) {
// fill the gap between two actual input rows
while (((pFillInfo->currentKey < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->currentKey > ts && !FILL_IS_ASC_FILL(pFillInfo))) &&
pFillInfo->numOfCurrent < outputRows) {
doFillOneRowResult(pFillInfo, data, srcData, ts, false);
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
// fill the gap between two input rows
while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
}
// output buffer is full, abort
@ -208,61 +249,66 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
}
} else {
assert(pFillInfo->currentKey == ts);
initBeforeAfterDataBuf(pFillInfo, prev);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
initBeforeAfterDataBuf(pFillInfo, next);
++pFillInfo->index;
copyCurrentRowIntoBuf(pFillInfo, srcData, *next);
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
--pFillInfo->index;
}
// assign rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) {
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
continue;
}
char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index);
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) ||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->offset, src, pCol->col.bytes);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
char* src = colDataGetData(pSrc, pFillInfo->index);
if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) {
assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type);
SGroupKeys *pKey = taosArrayGet(pFillInfo->prev, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->offset, src, pCol->col.bytes);
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else if (pFillInfo->type == TSDB_FILL_NULL) {
colDataAppendNULL(pDst, pFillInfo->numOfCurrent);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) {
assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type);
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
SGroupKeys *pKey = taosArrayGet(pFillInfo->next, i);
doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
} else {
assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false);
}
}
}
// set the tag value for final result
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
// setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
SInterval *pInterval = &pFillInfo->interval;
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
pFillInfo->index += 1;
pFillInfo->numOfCurrent += 1;
}
if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
/* the raw data block is exhausted, next value does not exists */
if (pFillInfo->index >= pFillInfo->numOfRows) {
taosMemoryFreeClear(*next);
}
// if (pFillInfo->index >= pFillInfo->numOfRows) {
// taosMemoryFreeClear(*next);
// }
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return pFillInfo->numOfCurrent;
}
@ -271,14 +317,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
return pFillInfo->numOfCurrent;
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) {
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) {
SGroupKeys *pKey = taosArrayGet(rowBuf, columnIndex);
if (isNull) {
pKey->isNull = true;
} else {
memcpy(pKey->pData, src, pKey->bytes);
pKey->isNull = false;
}
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
/*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
*/
pFillInfo->numOfCurrent = 0;
while (pFillInfo->numOfCurrent < resultCapacity) {
doFillOneRowResult(pFillInfo, output, pFillInfo->pData, pFillInfo->start, true);
doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
}
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
@ -295,15 +351,15 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = NULL;
SResSchema* pSchema = &pColInfo->pExpr->base.resSchema;
if (TSDB_COL_IS_TAG(pColInfo->flag) || pColInfo->col.type == TSDB_DATA_TYPE_BINARY) {
if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) {
numOfTags += 1;
bool exists = false;
int32_t index = -1;
for (int32_t j = 0; j < k; ++j) {
if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) {
if (pFillInfo->pTags[j].col.colId == pSchema->slotId) {
exists = true;
index = j;
break;
@ -311,12 +367,12 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
}
if (!exists) {
SSchema* pSchema = &pFillInfo->pTags[k].col;
pSchema->colId = pColInfo->col.slotId;
pSchema->type = pColInfo->col.type;
pSchema->bytes = pColInfo->col.bytes;
SSchema* pSchema1 = &pFillInfo->pTags[k].col;
pSchema1->colId = pSchema->slotId;
pSchema1->type = pSchema->type;
pSchema1->bytes = pSchema->bytes;
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pColInfo->col.bytes);
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes);
pColInfo->tagIndex = k;
k += 1;
@ -325,7 +381,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
}
}
rowsize += pColInfo->col.bytes;
rowsize += pSchema->bytes;
}
pFillInfo->numOfTags = numOfTags;
@ -355,7 +411,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
}
taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order;
switch(fillType) {
@ -364,6 +419,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break;
case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break;
case FILL_MODE_VALUE: pFillInfo->type = TSDB_FILL_SET_VALUE; break;
default:
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
@ -376,7 +432,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
pFillInfo->alloc = capacity;
pFillInfo->id = id;
pFillInfo->interval = *pInterval;
pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
// if (numOfTags > 0) {
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
@ -385,6 +440,11 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
}
// }
pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys));
pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys));
initBeforeAfterDataBuf(pFillInfo);
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0);
return pFillInfo;
@ -405,18 +465,15 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
return NULL;
}
taosMemoryFreeClear(pFillInfo->prevValues);
taosMemoryFreeClear(pFillInfo->nextValues);
taosArrayDestroy(pFillInfo->prev);
taosArrayDestroy(pFillInfo->next);
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
}
taosMemoryFreeClear(pFillInfo->pTags);
taosMemoryFreeClear(pFillInfo->pData);
taosMemoryFreeClear(pFillInfo->pFillCol);
taosMemoryFreeClear(pFillInfo);
return NULL;
}
@ -436,18 +493,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
}
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData;
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.slotId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
}
}
pFillInfo->pSrcBlock = (SSDataBlock*) pInput;
}
bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
@ -465,8 +511,9 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
}
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
int64_t* tsList = (int64_t*) pFillInfo->pData[0];
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
int64_t* tsList = (int64_t*) pCol->pData;
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
TSKEY ekey1 = ekey;
@ -513,7 +560,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint*
return TSDB_CODE_SUCCESS;
}
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) {
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo);
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
@ -521,9 +568,9 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t cap
// no data existed for fill operation now, append result according to the fill strategy
if (remain == 0) {
appendFilledResult(pFillInfo, output, numOfRes);
appendFilledResult(pFillInfo, p, numOfRes);
} else {
fillResultImpl(pFillInfo, output, (int32_t) numOfRes);
fillResultImpl(pFillInfo, p, (int32_t) numOfRes);
assert(numOfRes == pFillInfo->numOfCurrent);
}
@ -538,28 +585,30 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
return pFillInfo->start;
}
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) {
int32_t offset = 0;
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) {
SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
if (pFillCol == NULL) {
return NULL;
}
size_t len = (pValNode != NULL)? LIST_LENGTH(pValNode->pNodeList):0;
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
SExprInfo* pExprInfo = &pExpr[i];
pFillCol[i].pExpr = pExprInfo;
pFillCol[i].tagIndex = -2;
pFillCol[i].col = pExprInfo->base.resSchema;
pFillCol[i].offset = offset;
pFillCol[i].tagIndex = -2;
// todo refactor
if (len > 0) {
// if the user specified value is less than the column, alway use the last one as the fill value
int32_t index = (i >= len)? (len - 1):i;
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
valueNodeToVariant(pv, &pFillCol[i].fillVal);
}
if (pExprInfo->base.numOfParams > 0) {
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
}
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
// pFillCol[i].val.d = *val;
offset += pExprInfo->base.resSchema.bytes;
}
return pFillCol;

File diff suppressed because it is too large Load Diff

View File

@ -55,7 +55,7 @@ typedef struct SDummyInputInfo {
SSDataBlock* pBlock;
} SDummyInputInfo;
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
@ -121,7 +121,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;

View File

@ -78,6 +78,8 @@ typedef struct SDiffInfo {
int64_t i64;
double d64;
} prev;
int64_t prevTs;
} SDiffInfo;
typedef struct SSpreadInfo {
@ -1196,9 +1198,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
bool isFirstBlock = (pDiffInfo->hasPrev == false);
int32_t numOfElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
@ -1206,44 +1205,86 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
if (pCtx->order == TSDB_ORDER_ASC) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1;
}
continue;
}
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
numOfElems += 1;
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
}
continue;
}
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
int32_t pos = startOffset + numOfElems;
// there is a row of previous data block to be handled in the first place.
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
}
pDiffInfo->hasPrev = false;
}
// it is not the last row of current block
if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);
int32_t delta = v - next; // direct previous may be null
colDataAppendInt32(pOutput, pos, &delta);
}
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
} else {
pDiffInfo->prev.i64 = v;
if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
}
pDiffInfo->hasPrev = true;
}
numOfElems++;
}
pDiffInfo->prev.i64 = v;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
@ -1378,7 +1419,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
}
// initial value is not set yet
if (!pDiffInfo->hasPrev || numOfElems <= 0) {
if (numOfElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
@ -1386,15 +1427,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
assert(pCtx->hasNull);
return 0;
} else {
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
// }
// }
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
return forwardStep;
return (isFirstBlock) ? numOfElems - 1 : numOfElems;
}
}

View File

@ -1521,7 +1521,7 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
SFillPhysiNode* pNode = (SFillPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode);
}

View File

@ -945,7 +945,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}

View File

@ -949,9 +949,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
if (pCtx->pSem != NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
tTrace("%s cli conn %p(sync) handle resp", pTransInst->label, pConn);
if (pCtx->pRsp == NULL) {
tTrace("%s cli conn %p(sync) failed to resp, ignore", pTransInst->label, pConn);
} else {
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
}
tsem_post(pCtx->pSem);
pCtx->pRsp = NULL;
} else {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
pTransInst->cfp(pTransInst->parent, pResp, pEpSet);