other: merge 3.0

This commit is contained in:
Cary Xu 2022-05-18 10:34:28 +08:00
commit 4be2d07a51
23 changed files with 276 additions and 827 deletions

View File

@ -403,6 +403,19 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols); STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
typedef struct { typedef struct {

View File

@ -174,22 +174,8 @@ typedef struct SqlFunctionCtx {
SResultDataInfo resDataInfo; SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0 uint8_t scanFlag; // record current running step, default: 0
////////////////////////////////////////////////////////////////
int32_t startRow; // start row index
int32_t size; // handled processed row number
SColumnInfoData* pInput;
SColumnDataAgg agg;
int16_t inputType; // TODO remove it
int16_t inputBytes; // TODO remove it
bool hasNull; // null value exist in current block, TODO remove it
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
bool stableQuery;
/////////////////////////////////////////////////////////////////
int16_t functionId; // function id int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list

View File

@ -218,6 +218,13 @@ int32_t stmtParseSql(STscStmt* pStmt) {
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
pStmt->sql.type = STMT_TYPE_INSERT;
} else if (pStmt->sql.pQuery->pPrepareRoot) {
pStmt->sql.type = STMT_TYPE_QUERY;
}
/*
switch (nodeType(pStmt->sql.pQuery->pRoot)) { switch (nodeType(pStmt->sql.pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->sql.type) { if (0 == pStmt->sql.type) {
@ -231,6 +238,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot)); tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
} }
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -398,11 +398,14 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1; if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1; if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1; if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1;
if (pCfg->replications != 1 && pCfg->replications != 3) return -1; if (pCfg->replications != 1 && pCfg->replications != 3) return -1;
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1; if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1; if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->hashMethod != 1) return -1; if (pCfg->hashMethod != 1) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
terrno = 0; terrno = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1447,8 +1450,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} }
char *status = "ready"; char *status = "ready";
char b[24] = {0}; char statusB[24] = {0};
STR_WITH_SIZE_TO_VARSTR(b, status, strlen(status)); STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status));
if (sysDb) { if (sysDb) {
for (int32_t i = 0; i < pShow->numOfColumns; ++i) { for (int32_t i = 0; i < pShow->numOfColumns; ++i) {
@ -1458,7 +1461,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} else if (i == 3) { } else if (i == 3) {
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false); colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
} else if (i == 20) { } else if (i == 20) {
colDataAppend(pColInfo, rows, b, false); colDataAppend(pColInfo, rows, statusB, false);
} else { } else {
colDataAppendNULL(pColInfo, rows); colDataAppendNULL(pColInfo, rows);
} }
@ -1481,9 +1484,10 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false);
const char *src = pDb->cfg.strict ? "strict" : "nostrict"; const char *src = pDb->cfg.strict ? "strict" : "nostrict";
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src)); char strict[24] = {0};
STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)b, false); colDataAppend(pColInfo, rows, (const char *)strict, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.daysPerFile, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.daysPerFile, false);
@ -1554,7 +1558,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols); pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, rows, (const char *)b, false); colDataAppend(pColInfo, rows, (const char *)statusB, false);
} }
} }

View File

@ -502,6 +502,17 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
} }
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
// if (pTopic == NULL) {
// if (dropReq.igNotExists) {
// mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
// return 0;
// } else {
// terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
// mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
// return -1;
// }
// }
if (pTopic->refConsumerCnt != 0) { if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;

View File

@ -70,7 +70,7 @@ struct SSmaStatItem {
* N.B. only applicable to tsma * N.B. only applicable to tsma
*/ */
int8_t state; // ETsdbSmaStat int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: N/A SHashObj *expiredWindows; // key: skey of time window, value: version
STSma *pTSma; // cache schema STSma *pTSma; // cache schema
}; };

View File

@ -60,7 +60,7 @@ typedef struct {
TSKEY minKey; TSKEY minKey;
} SRtn; } SRtn;
#define TSDB_DATA_DIR_LEN 6 #define TSDB_DATA_DIR_LEN 6 // adapt accordingly
struct STsdb { struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;

View File

@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
@ -67,9 +67,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = taosMemoryCalloc(1, sizeof(STSma)); pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
if(!pME->smaEntry.tsma) { if(!pME->smaEntry.tsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;

View File

@ -394,11 +394,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
goto _err; goto _err;
} }
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (pCur == NULL) {
goto _err;
}
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
int64_t smaId; int64_t smaId;
@ -442,12 +437,10 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(pSmaIds); taosArrayDestroy(pSmaIds);
metaCloseSmaCursor(pCur);
return pSW; return pSW;
_err: _err:
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(pSmaIds); taosArrayDestroy(pSmaIds);
metaCloseSmaCursor(pCur);
tdFreeTSmaWrapper(pSW, deepCopy); tdFreeTSmaWrapper(pSW, deepCopy);
return NULL; return NULL;
} }

View File

@ -55,12 +55,13 @@ static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int
} }
static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) { static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
int ret;
tdb_cmpr_fn_t compFunc; tdb_cmpr_fn_t compFunc;
// Create a database // Create a database
compFunc = tdSmaKeyCmpr; compFunc = tdSmaKeyCmpr;
ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB); if(tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
return -1;
}
return 0; return 0;
} }
@ -76,7 +77,7 @@ int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// Open DBF // Open DBF
if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) { if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
terrno = TSDB_CODE_TDB_INIT_FAILED; smaError("failed to open DBF: %s", pDBF->path);
smaCloseDBDb(pDBF->pDB); smaCloseDBDb(pDBF->pDB);
return -1; return -1;
} }
@ -97,9 +98,10 @@ int32_t smaCloseDBF(SDBFile *pDBF) {
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) { int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret; int32_t ret;
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
ret = tdbDbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) { if (ret < 0) {
smaError("failed to create insert sma data into db, ret = %d", ret); smaError("failed to upsert tsma data into db, ret = %d", ret);
return -1; return -1;
} }
@ -113,7 +115,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) { if (ret < 0) {
smaError("failed to get sma data from db, ret = %d", ret); smaError("failed to get tsma data from db, ret = %d", ret);
return NULL; return NULL;
} }

View File

@ -16,21 +16,22 @@
#include "sma.h" #include "sma.h"
#include "tsdb.h" #include "tsdb.h"
typedef STsdbCfg STSmaKeepCfg;
#undef _TEST_SMA_PRINT_DEBUG_LOG_ #undef _TEST_SMA_PRINT_DEBUG_LOG_
#define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_MINUTES 86400
#define SMA_STORAGE_TSDB_TIMES 10 #define SMA_STORAGE_TSDB_TIMES 10
#define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_STORAGE_SPLIT_FACTOR 144 // least records in tsma file
#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8 #define SMA_KEY_LEN 16 // TSKEY+groupId 8+8
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds #define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
#define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_STATE_ITEM_HASH_SLOT 32
typedef struct { typedef struct {
SSma *pSma; SSma *pSma;
SDBFile dFile; SDBFile dFile;
const SArray *pDataBlocks; // sma data const SArray *pDataBlocks; // sma data
int32_t interval; // interval with the precision of DB int64_t interval; // interval with the precision of DB
} STSmaWriteH; } STSmaWriteH;
typedef struct { typedef struct {
@ -42,10 +43,10 @@ typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
SSma *pSma; SSma *pSma;
SDBFile dFile; SDBFile dFile;
int32_t interval; // interval with the precision of DB int64_t interval; // interval with the precision of DB
int32_t blockSize; // size of SMA block item int32_t blockSize; // size of SMA block item
int32_t days;
int8_t storageLevel; int8_t storageLevel;
int8_t days;
SmaFsIter smaFsIter; SmaFsIter smaFsIter;
} STSmaReadH; } STSmaReadH;
@ -58,7 +59,7 @@ typedef enum {
// static func // static func
static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted); static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval);
static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval, static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval,
int8_t intervalUnit); int8_t intervalUnit);
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit); static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit);
@ -92,9 +93,10 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
* @return int32_t * @return int32_t
*/ */
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit) { static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit) {
STSmaKeepCfg *pCfg = SMA_TSDB_CFG(pSma);
pSmaH->pSma = pSma; pSmaH->pSma = pSma;
pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true); pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true);
pSmaH->storageLevel = tdGetSmaStorageLevel(interval, intervalUnit); pSmaH->storageLevel = tdGetSmaStorageLevel(pCfg, interval);
pSmaH->days = tdGetTSmaDays(pSma, pSmaH->interval, pSmaH->storageLevel); pSmaH->days = tdGetTSmaDays(pSma, pSmaH->interval, pSmaH->storageLevel);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -275,11 +277,13 @@ static int32_t tdSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t f
*/ */
static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) { static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma); STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
int32_t daysPerFile = pCfg->days; int32_t daysPerFile = pCfg->days; // unit is minute
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]); int32_t minutes = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]);
daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS; if (minutes > SMA_STORAGE_TSDB_MINUTES) {
daysPerFile = SMA_STORAGE_TSDB_MINUTES;
}
} }
return daysPerFile; return daysPerFile;
@ -288,46 +292,15 @@ static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel)
/** /**
* @brief Judge the tSma storage level * @brief Judge the tSma storage level
* *
* @param pCfg
* @param interval * @param interval
* @param intervalUnit
* @return int32_t * @return int32_t
*/ */
static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) {
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS? int64_t mInterval = convertTimeFromPrecisionToUnit(interval, pCfg->precision, TIME_UNIT_MINUTE);
switch (intervalUnit) { if (pCfg->days / mInterval >= SMA_STORAGE_SPLIT_FACTOR) {
case TIME_UNIT_HOUR:
if (interval < SMA_STORAGE_SPLIT_HOURS) {
return SMA_STORAGE_LEVEL_DFILESET; return SMA_STORAGE_LEVEL_DFILESET;
} }
break;
case TIME_UNIT_MINUTE:
if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
return SMA_STORAGE_LEVEL_DFILESET;
}
break;
case TIME_UNIT_SECOND:
if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
return SMA_STORAGE_LEVEL_DFILESET;
}
break;
case TIME_UNIT_MILLISECOND:
if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
return SMA_STORAGE_LEVEL_DFILESET;
}
break;
case TIME_UNIT_MICROSECOND:
if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
return SMA_STORAGE_LEVEL_DFILESET;
}
break;
case TIME_UNIT_NANOSECOND:
if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
return SMA_STORAGE_LEVEL_DFILESET;
}
break;
default:
break;
}
return SMA_STORAGE_LEVEL_TSDB; return SMA_STORAGE_LEVEL_TSDB;
} }
@ -346,6 +319,7 @@ static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma); STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
const SArray *pDataBlocks = (const SArray *)msg; const SArray *pDataBlocks = (const SArray *)msg;
int64_t testSkey = TSKEY_INITIAL_VAL;
// TODO: destroy SSDataBlocks(msg) // TODO: destroy SSDataBlocks(msg)
@ -403,8 +377,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
// Step 1: Judge the storage level and days // Step 1: Judge the storage level and days
int32_t storageLevel = tdGetSmaStorageLevel(pTSma->interval, pTSma->intervalUnit); int32_t storageLevel = tdGetSmaStorageLevel(pCfg, tSmaH.interval);
int32_t daysPerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel); int32_t minutePerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel);
char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId
char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer? char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer?
@ -432,6 +406,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
if (!isStartKey) { if (!isStartKey) {
isStartKey = true; isStartKey = true;
skey = *(TSKEY *)var; skey = *(TSKEY *)var;
testSkey = skey;
printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId); printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
tdEncodeTSmaKey(groupId, skey, &pSmaKey); tdEncodeTSmaKey(groupId, skey, &pSmaKey);
} else { } else {
@ -503,9 +478,10 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
break; break;
} }
} }
printf("\n");
// if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) { // if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) {
if (tlen > 0) { if (tlen > 0) {
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision)); int32_t fid = (int32_t)(TSDB_KEY_FID(skey, minutePerFile, pCfg->precision));
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
// file // file
@ -517,6 +493,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
smaCloseDBF(&tSmaH.dFile); smaCloseDBF(&tSmaH.dFile);
} }
tdSetTSmaDataFile(&tSmaH, indexUid, fid); tdSetTSmaDataFile(&tSmaH, indexUid, fid);
smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32 " queryKey:%" PRIi64,
SMA_VID(pSma), tSmaH.dFile.path, minutePerFile, tSmaH.interval, storageLevel, testSkey);
if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) { if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma), smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
@ -528,7 +506,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) { if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
smaWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 smaWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
" since %s", " since %s",
SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno)); SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno));
tdSmaEndCommit(pEnv); tdSmaEndCommit(pEnv);
@ -536,7 +514,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
tdUnRefSmaStat(pSma, pStat); tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
smaDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
SMA_VID(pSma), indexUid, skey, groupId); SMA_VID(pSma), indexUid, skey, groupId);
// TODO:tsdbEndTSmaCommit(); // TODO:tsdbEndTSmaCommit();
@ -547,7 +526,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
SMA_VID(pSma), skey, tlen, indexUid); SMA_VID(pSma), skey, tlen, indexUid);
} }
printf("\n");
} }
} }
tdSmaEndCommit(pEnv); // TODO: not commit for every insert tdSmaEndCommit(pEnv); // TODO: not commit for every insert
@ -579,13 +557,13 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
TXN *txn) { TXN *txn) {
SDBFile *pDBFile = &pSmaH->dFile; SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree(TDB) // TODO: insert tsma data blocks into B+Tree(TDB)
if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) { if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
smaWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", smaWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", smaDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_ #ifdef _TEST_SMA_PRINT_DEBUG_LOG_
@ -776,6 +754,8 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
tdUnRefSmaStat(pSma, pStat); tdUnRefSmaStat(pSma, pStat);
tdInitTSmaFile(&tReadH, indexUid, querySKey); tdInitTSmaFile(&tReadH, indexUid, querySKey);
smaDebug("### vgId:%d read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) { if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno)); smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -783,7 +763,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
char smaKey[SMA_KEY_LEN] = {0}; char smaKey[SMA_KEY_LEN] = {0};
void *pSmaKey = &smaKey; void *pSmaKey = &smaKey;
int64_t queryGroupId = 1; int64_t queryGroupId = 0;
tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey); tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma), smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma),
@ -915,8 +895,8 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
taosMemoryFree(pItem); taosMemoryFree(pItem);
smaWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, smaWarn("vgId:%d set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
tstrerror(terrno)); indexUid, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pItem->pTSma = pTSma; pItem->pTSma = pTSma;
@ -1021,7 +1001,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tdFreeTSmaWrapper(pSW, false);
break; break;
} }
if (!pSW || (pTSma->tableUid != msgIter.suid)) { if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) {
if (pSW) { if (pSW) {
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tdFreeTSmaWrapper(pSW, false);
} }
@ -1043,6 +1023,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
interval.slidingUnit = pTSma->slidingUnit; interval.slidingUnit = pTSma->slidingUnit;
} }
// TODO: process multiple tsma for one table uid
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
if (lastWinSKey != winSKey) { if (lastWinSKey != winSKey) {

View File

@ -42,7 +42,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
int slen = 0; int slen = 0;
*ppTsdb = NULL; *ppTsdb = NULL;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + TSDB_DATA_DIR_LEN + 3; slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3;
// create handle // create handle
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen); pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
@ -73,7 +73,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
goto _err; goto _err;
} }
tsdbDebug("vgId:%d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path); tsdbDebug("vgId:%d tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
*ppTsdb = pTsdb; *ppTsdb = pTsdb;
return 0; return 0;

View File

@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle,
} }
if (level == TSDB_RETENTION_L0) { if (level == TSDB_RETENTION_L0) {
tsdbDebug("vgId:%d readh:%p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0); tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0);
return VND_RSMA0(pVnode); return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) { } else if (level == TSDB_RETENTION_L1) {
tsdbDebug("vgId:%d readh:%p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1); tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1);
return VND_RSMA1(pVnode); return VND_RSMA1(pVnode);
} else { } else {
tsdbDebug("vgId:%d readh:%p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2); tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2);
return VND_RSMA2(pVnode); return VND_RSMA2(pVnode);
} }
} }

View File

@ -876,13 +876,13 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t ke
TXN *txn) { TXN *txn) {
SDBFile *pDBFile = &pSmaH->dFile; SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree(TDB) // TODO: insert tsma data blocks into B+Tree(TDB)
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) { if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", tsdbWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", tsdbDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_ #ifdef _TEST_SMA_PRINT_DEBUG_LOG_
@ -1245,7 +1245,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
} }
if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) { if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 tsdbWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
" since %s", " since %s",
REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno)); REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno));
tsdbSmaEndCommit(pEnv); tsdbSmaEndCommit(pEnv);
@ -1253,7 +1253,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
tsdbUnRefSmaStat(pTsdb, pStat); tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
tsdbDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, tsdbDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
REPO_ID(pTsdb), indexUid, skey, groupId); REPO_ID(pTsdb), indexUid, skey, groupId);
// TODO:tsdbEndTSmaCommit(); // TODO:tsdbEndTSmaCommit();

View File

@ -103,7 +103,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open sma // open sma
if (smaOpen(pVnode)) { if (smaOpen(pVnode)) {
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }

View File

@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
// keep it temporarily // keep it temporarily
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
// not a whole block involved in query processing, statistics data can not be used // not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here // NOTE: the original value of isSet have been changed here
if (pCtx[k].isAggSet && forwardStep < numOfTotal) { if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
pCtx[k].isAggSet = false; pCtx[k].input.colDataAggIsSet = false;
} }
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
int32_t order) { int32_t order) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
} }
} }
@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order; pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows; pCtx[i].input.numOfRows = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock; pCtx[i].pSrcBlock = pBlock;
pCtx[i].scanFlag = scanFlag; pCtx[i].scanFlag = scanFlag;
@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;
// todo add a dummy funtion to avoid process check // todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) { if (pCtx[k].fpSet.process != NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]); int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
int32_t rowIndex) { int32_t rowIndex) {
for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index
pCtx[j].startRow = rowIndex; // pCtx[j].startRow = rowIndex;
} }
for (int32_t j = 0; j < numOfExpr; ++j) { for (int32_t j = 0; j < numOfExpr; ++j) {
@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].size = 1; // pCtx[i].size = 1;
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
@ -4248,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
int32_t numOfRows = 10; int32_t numOfRows = 1024;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);

View File

@ -323,6 +323,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
} }
return numOfElem; return numOfElem;
} }
/* /*
* count function does need the finalize, if data is missing, the default value, which is 0, is used * count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer * count function does not use the pCtx->interResBuf to keep the intermediate buffer
@ -817,16 +818,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval); int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
pBuf->v = val; pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
@ -839,15 +830,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval); uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
pBuf->v = val; pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
@ -859,7 +841,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
double val = GET_DOUBLE_VAL(tval); double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) { if ((prev < val) ^ isMinFunc) {
pBuf->v = val; pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
@ -1739,7 +1720,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
double v = 0; double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data); GET_TYPED_DATA(v, double, type, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) { if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v); SET_DOUBLE_VAL(&pInfo->minval, v);
} }
@ -2552,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
} }
} }
} else { // computing based on the true data block } else { // computing based on the true data block
if (0 == pCtx->size) { if (0 == pInput->numOfRows) {
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key; pInfo->min = pCtx->end.key;
@ -2571,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start); TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start);
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) { if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max; pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else { } else {
pInfo->max = pCtx->start.key + 1; pInfo->max = pCtx->start.key + 1;
} }
@ -2591,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
if (pCtx->end.key != INT64_MIN) { if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1; pInfo->max = pCtx->end.key + 1;
} else { } else {
pInfo->max = ptsList[pCtx->size - 1]; pInfo->max = ptsList[start + pInput->numOfRows - 1];
} }
} }
} }

View File

@ -480,7 +480,7 @@ static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize); initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
return true; return true;
} }
#if 0
/** /**
* in handling the stable query, function_finalizer is called after the secondary * in handling the stable query, function_finalizer is called after the secondary
* merge being completed, during the first merge procedure, which is executed at the * merge being completed, during the first merge procedure, which is executed at the
@ -497,53 +497,6 @@ static void function_finalizer(SqlFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
static void count_function(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0;
/*
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true;
* 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false;
*/
if (pCtx->isAggSet) {
numOfElem = pCtx->size - pCtx->agg.numOfNull;
} else {
if (pCtx->hasNull) {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *val = GET_INPUT_DATA(pCtx, i);
if (isNull(val, pCtx->inputType)) {
continue;
}
numOfElem += 1;
}
} else {
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
numOfElem = pCtx->size;
}
}
if (numOfElem > 0) {
// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
}
*((int64_t *)pCtx->pOutput) += numOfElem;
SET_VAL(pCtx, numOfElem, 1);
}
static void count_func_merge(SqlFunctionCtx *pCtx) {
int64_t *pData = (int64_t *)GET_INPUT_DATA_LIST(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i) {
*((int64_t *)pCtx->pOutput) += pData[i];
}
SET_VAL(pCtx, pCtx->size, 1);
}
/** /**
* 1. If the column value for filter exists, we need to load the SFields, which serves * 1. If the column value for filter exists, we need to load the SFields, which serves
* as the pre-filter to decide if the actual data block is required or not. * as the pre-filter to decide if the actual data block is required or not.
@ -633,113 +586,6 @@ int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
LOOPCHECK_N(*_data, _list, ctx, tsdbType, sign, notNullElems); \ LOOPCHECK_N(*_data, _list, ctx, tsdbType, sign, notNullElems); \
} while (0) } while (0)
static void do_sum(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0;
// Only the pre-computing information loaded and actual data does not loaded
if (pCtx->isAggSet) {
notNullElems = pCtx->size - pCtx->agg.numOfNull;
assert(pCtx->size >= pCtx->agg.numOfNull);
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t *retVal = (int64_t *)pCtx->pOutput;
*retVal += pCtx->agg.sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
uint64_t *retVal = (uint64_t *)pCtx->pOutput;
*retVal += (uint64_t)pCtx->agg.sum;
} else if (IS_FLOAT_TYPE(pCtx->inputType)) {
double *retVal = (double*) pCtx->pOutput;
SET_DOUBLE_VAL(retVal, *retVal + GET_DOUBLE_VAL((const char*)&(pCtx->agg.sum)));
}
} else { // computing based on the true data block
void *pData = GET_INPUT_DATA_LIST(pCtx);
notNullElems = 0;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t *retVal = (int64_t *)pCtx->pOutput;
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
LIST_ADD_N(*retVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
LIST_ADD_N(*retVal, pCtx, pData, int16_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
LIST_ADD_N(*retVal, pCtx, pData, int32_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
LIST_ADD_N(*retVal, pCtx, pData, int64_t, notNullElems, pCtx->inputType);
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
uint64_t *retVal = (uint64_t *)pCtx->pOutput;
if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint8_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint16_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint32_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType);
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
double *retVal = (double *)pCtx->pOutput;
LIST_ADD_N_DOUBLE(*retVal, pCtx, pData, double, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double *retVal = (double *)pCtx->pOutput;
LIST_ADD_N_DOUBLE_FLOAT(*retVal, pCtx, pData, float, notNullElems, pCtx->inputType);
}
}
// data in the check operation are all null, not output
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
}
}
static void sum_function(SqlFunctionCtx *pCtx) {
do_sum(pCtx);
// keep the result data in output buffer, not in the intermediate buffer
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) {
// set the flag for super table query
SSumInfo *pSum = (SSumInfo *)pCtx->pOutput;
pSum->hasResult = DATA_SET_FLAG;
// }
}
static void sum_func_merge(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0;
GET_TRUE_DATA_TYPE();
assert(pCtx->stableQuery);
for (int32_t i = 0; i < pCtx->size; ++i) {
char * input = GET_INPUT_DATA(pCtx, i);
SSumInfo *pInput = (SSumInfo *)input;
if (pInput->hasResult != DATA_SET_FLAG) {
continue;
}
notNullElems++;
if (IS_SIGNED_NUMERIC_TYPE(type)) {
*(int64_t *)pCtx->pOutput += pInput->isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
*(uint64_t *) pCtx->pOutput += pInput->usum;
} else {
SET_DOUBLE_VAL((double *)pCtx->pOutput, *(double *)pCtx->pOutput + pInput->dsum);
}
}
SET_VAL(pCtx, notNullElems, 1);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (notNullElems > 0) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
}
static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_SMA_LOAD; return BLK_DATA_SMA_LOAD;
} }
@ -2735,18 +2581,6 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += notNullElems; GET_RES_INFO(pCtx)->numOfRes += notNullElems;
} }
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].param.nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].param.i = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].param.i)); \
*(type *)(&(ctx)->param[1].param.i) = *(type *)(d); \
*(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \
} \
} while (0);
// TODO difference in date column // TODO difference in date column
static void diff_function(SqlFunctionCtx *pCtx) { static void diff_function(SqlFunctionCtx *pCtx) {
void *data = GET_INPUT_DATA_LIST(pCtx); void *data = GET_INPUT_DATA_LIST(pCtx);
@ -4072,460 +3906,4 @@ int32_t functionCompatList[] = {
// tid_tag, derivative, blk_info // tid_tag, derivative, blk_info
6, 8, 7, 6, 8, 7,
}; };
#endif
SAggFunctionInfo aggFunc[35] = {{
// 0, count function does not invoke the finalize function
"count",
FUNCTION_TYPE_AGG,
FUNCTION_COUNT,
FUNCTION_COUNT,
BASIC_FUNC_SO,
function_setup,
count_function,
doFinalizer,
count_func_merge,
countRequired,
},
{
// 1
"sum",
FUNCTION_TYPE_AGG,
FUNCTION_SUM,
FUNCTION_SUM,
BASIC_FUNC_SO,
function_setup,
sum_function,
function_finalizer,
sum_func_merge,
statisRequired,
},
{
// 2
"avg",
FUNCTION_TYPE_AGG,
FUNCTION_AVG,
FUNCTION_AVG,
BASIC_FUNC_SO,
function_setup,
avg_function,
avg_finalizer,
avg_func_merge,
statisRequired,
},
{
// 3
"min",
FUNCTION_TYPE_AGG,
FUNCTION_MIN,
FUNCTION_MIN,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
min_func_setup,
NULL,
function_finalizer,
min_func_merge,
statisRequired,
},
{
// 4
"max",
FUNCTION_TYPE_AGG,
FUNCTION_MAX,
FUNCTION_MAX,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
max_func_setup,
NULL,
function_finalizer,
max_func_merge,
statisRequired,
},
{
// 5
"stddev",
FUNCTION_TYPE_AGG,
FUNCTION_STDDEV,
FUNCTION_STDDEV_DST,
FUNCSTATE_SO | FUNCSTATE_STREAM,
function_setup,
stddev_function,
stddev_finalizer,
noop1,
dataBlockRequired,
},
{
// 6
"percentile",
FUNCTION_TYPE_AGG,
FUNCTION_PERCT,
FUNCTION_INVALID_ID,
FUNCSTATE_SO | FUNCSTATE_STREAM,
percentile_function_setup,
percentile_function,
percentile_finalizer,
noop1,
dataBlockRequired,
},
{
// 7
"apercentile",
FUNCTION_TYPE_AGG,
FUNCTION_APERCT,
FUNCTION_APERCT,
FUNCSTATE_SO | FUNCSTATE_STREAM | FUNCSTATE_STABLE,
apercentile_function_setup,
apercentile_function,
apercentile_finalizer,
apercentile_func_merge,
dataBlockRequired,
},
{
// 8
"first",
FUNCTION_TYPE_AGG,
FUNCTION_FIRST,
FUNCTION_FIRST_DST,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
function_setup,
first_function,
function_finalizer,
noop1,
firstFuncRequired,
},
{
// 9
"last",
FUNCTION_TYPE_AGG,
FUNCTION_LAST,
FUNCTION_LAST_DST,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
function_setup,
last_function,
function_finalizer,
noop1,
lastFuncRequired,
},
{
// 10
"last_row",
FUNCTION_TYPE_AGG,
FUNCTION_LAST_ROW,
FUNCTION_LAST_ROW,
FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_row_function,
last_row_finalizer,
last_dist_func_merge,
dataBlockRequired,
},
{
// 11
"top",
FUNCTION_TYPE_AGG,
FUNCTION_TOP,
FUNCTION_TOP,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
top_function,
top_bottom_func_finalizer,
top_func_merge,
dataBlockRequired,
},
{
// 12
"bottom",
FUNCTION_TYPE_AGG,
FUNCTION_BOTTOM,
FUNCTION_BOTTOM,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
bottom_function,
top_bottom_func_finalizer,
bottom_func_merge,
dataBlockRequired,
},
{
// 13
"spread",
FUNCTION_TYPE_AGG,
FUNCTION_SPREAD,
FUNCTION_SPREAD,
BASIC_FUNC_SO,
spread_function_setup,
spread_function,
spread_function_finalizer,
spread_func_merge,
countRequired,
},
{
// 14
"twa",
FUNCTION_TYPE_AGG,
FUNCTION_TWA,
FUNCTION_TWA,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
twa_function_setup,
twa_function,
twa_function_finalizer,
twa_function_copy,
dataBlockRequired,
},
{
// 15
"leastsquares",
FUNCTION_TYPE_AGG,
FUNCTION_LEASTSQR,
FUNCTION_INVALID_ID,
FUNCSTATE_SO | FUNCSTATE_STREAM,
leastsquares_function_setup,
leastsquares_function,
leastsquares_finalizer,
noop1,
dataBlockRequired,
},
{
// 16
"dummy",
FUNCTION_TYPE_AGG,
FUNCTION_TS,
FUNCTION_TS,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
function_setup,
date_col_output_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 17
"ts",
FUNCTION_TYPE_AGG,
FUNCTION_TS_DUMMY,
FUNCTION_TS_DUMMY,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
function_setup,
noop1,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 18
"tag_dummy",
FUNCTION_TYPE_AGG,
FUNCTION_TAG_DUMMY,
FUNCTION_TAG_DUMMY,
BASIC_FUNC_SO,
function_setup,
tag_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 19
"ts",
FUNCTION_TYPE_AGG,
FUNCTION_TS_COMP,
FUNCTION_TS_COMP,
FUNCSTATE_MO | FUNCSTATE_NEED_TS,
ts_comp_function_setup,
ts_comp_function,
ts_comp_finalize,
copy_function,
dataBlockRequired,
},
{
// 20
"tag",
FUNCTION_TYPE_AGG,
FUNCTION_TAG,
FUNCTION_TAG,
BASIC_FUNC_SO,
function_setup,
tag_function,
doFinalizer,
copy_function,
noDataRequired,
},
{//TODO this is a scala function
// 21, column project sql function
"colprj",
FUNCTION_TYPE_AGG,
FUNCTION_PRJ,
FUNCTION_PRJ,
BASIC_FUNC_MO | FUNCSTATE_NEED_TS,
function_setup,
col_project_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 22, multi-output, tag function has only one result
"tagprj",
FUNCTION_TYPE_AGG,
FUNCTION_TAGPRJ,
FUNCTION_TAGPRJ,
BASIC_FUNC_MO,
function_setup,
tag_project_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 23
"arithmetic",
FUNCTION_TYPE_AGG,
FUNCTION_ARITHM,
FUNCTION_ARITHM,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS,
function_setup,
arithmetic_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 24
"diff",
FUNCTION_TYPE_AGG,
FUNCTION_DIFF,
FUNCTION_INVALID_ID,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
diff_function_setup,
diff_function,
doFinalizer,
noop1,
dataBlockRequired,
},
// distributed version used in two-stage aggregation processes
{
// 25
"first_dist",
FUNCTION_TYPE_AGG,
FUNCTION_FIRST_DST,
FUNCTION_FIRST_DST,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
first_last_function_setup,
first_dist_function,
function_finalizer,
first_dist_func_merge,
firstDistFuncRequired,
},
{
// 26
"last_dist",
FUNCTION_TYPE_AGG,
FUNCTION_LAST_DST,
FUNCTION_LAST_DST,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_dist_function,
function_finalizer,
last_dist_func_merge,
lastDistFuncRequired,
},
{
// 27
"stddev", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_STDDEV_DST,
FUNCTION_AVG,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
NULL,
NULL,
NULL,
dataBlockRequired,
},
{
// 28
"interp",
FUNCTION_TYPE_AGG,
FUNCTION_INTERP,
FUNCTION_INTERP,
FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS ,
function_setup,
interp_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 29
"rate",
FUNCTION_TYPE_AGG,
FUNCTION_RATE,
FUNCTION_RATE,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
rate_function_setup,
rate_function,
rate_finalizer,
rate_func_copy,
dataBlockRequired,
},
{
// 30
"irate",
FUNCTION_TYPE_AGG,
FUNCTION_IRATE,
FUNCTION_IRATE,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
rate_function_setup,
irate_function,
rate_finalizer,
rate_func_copy,
dataBlockRequired,
},
{
// 31
"tbid", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_TID_TAG,
FUNCTION_TID_TAG,
FUNCSTATE_MO | FUNCSTATE_STABLE,
function_setup,
noop1,
noop1,
noop1,
dataBlockRequired,
},
{ //32
"derivative", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_DERIVATIVE,
FUNCTION_INVALID_ID,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
deriv_function_setup,
deriv_function,
doFinalizer,
noop1,
dataBlockRequired,
},
{
// 33
"block_dist", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_BLKINFO,
FUNCTION_BLKINFO,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
blockInfo_func,
blockinfo_func_finalizer,
block_func_merge,
dataBlockRequired,
},
{
// 34
"cov", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_COV,
FUNCTION_COV,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
sum_function,
function_finalizer,
sum_func_merge,
statisRequired,
}
};

View File

@ -170,11 +170,11 @@ CaseCfg gCase[] = {
// 22 // 22
{"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2},
{"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, // {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
{"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, // {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
}; };
@ -209,7 +209,7 @@ typedef struct {
int32_t caseRunNum; // total run case num int32_t caseRunNum; // total run case num
} CaseCtrl; } CaseCtrl;
#if 0 #if 1
CaseCtrl gCaseCtrl = { // default CaseCtrl gCaseCtrl = { // default
.bindNullNum = 0, .bindNullNum = 0,
.printCreateTblSql = false, .printCreateTblSql = false,
@ -267,7 +267,7 @@ CaseCtrl gCaseCtrl = {
}; };
#endif #endif
#if 1 #if 0
CaseCtrl gCaseCtrl = { // query case with specified col&oper CaseCtrl gCaseCtrl = { // query case with specified col&oper
.bindNullNum = 1, .bindNullNum = 1,
.printCreateTblSql = false, .printCreateTblSql = false,
@ -292,7 +292,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
#if 0 #if 0
CaseCtrl gCaseCtrl = { // query case with specified col&oper CaseCtrl gCaseCtrl = { // query case with specified col&oper
.bindNullNum = 0, .bindNullNum = 1,
.printCreateTblSql = true, .printCreateTblSql = true,
.printQuerySql = true, .printQuerySql = true,
.printStmtSql = true, .printStmtSql = true,
@ -309,10 +309,10 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
.printRes = true, .printRes = true,
.runTimes = 0, .runTimes = 0,
.caseRunIdx = -1, .caseRunIdx = -1,
.optrIdxListNum = tListLen(optrIdxList), //.optrIdxListNum = tListLen(optrIdxList),
.optrIdxList = optrIdxList, //.optrIdxList = optrIdxList,
.bindColTypeNum = tListLen(bindColTypeList), //.bindColTypeNum = tListLen(bindColTypeList),
.bindColTypeList = bindColTypeList, //.bindColTypeList = bindColTypeList,
.caseIdx = 24, .caseIdx = 24,
.caseNum = 1, .caseNum = 1,
.caseRunNum = 1, .caseRunNum = 1,
@ -665,15 +665,16 @@ void bpGenerateConstInFuncSQL(BindData *data, int32_t tblIdx) {
void generateQueryMiscSQL(BindData *data, int32_t tblIdx) { void generateQueryMiscSQL(BindData *data, int32_t tblIdx) {
if (tblIdx == FUNCTION_TEST_IDX && gCurCase->bindNullNum <= 0) {
bpGenerateConstInFuncSQL(data, tblIdx);
} else {
switch(tblIdx) { switch(tblIdx) {
case 0: case 0:
//TODO FILL TEST //TODO FILL TEST
default: default:
bpGenerateConstInOpSQL(data, tblIdx); bpGenerateConstInOpSQL(data, tblIdx);
break; break;
case FUNCTION_TEST_IDX: }
bpGenerateConstInFuncSQL(data, tblIdx);
break;
} }
if (gCaseCtrl.printStmtSql) { if (gCaseCtrl.printStmtSql) {
@ -1064,6 +1065,8 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
if (tblIdx == FUNCTION_TEST_IDX) { if (tblIdx == FUNCTION_TEST_IDX) {
gCaseCtrl.numericParam = true; gCaseCtrl.numericParam = true;
} else {
gCaseCtrl.numericParam = false;
} }
for (int b = 0; b < bindNum; b++) { for (int b = 0; b < bindNum; b++) {
@ -1072,6 +1075,8 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
} }
} }
gCaseCtrl.numericParam = false;
generateQueryMiscSQL(data, tblIdx); generateQueryMiscSQL(data, tblIdx);
return 0; return 0;

View File

@ -39,7 +39,7 @@ sql show databases
print ==> rows: $rows print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[19] != nostrict then if $data(db)[19] != ready then
sleep 100 sleep 100
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
goto check_db_ready goto check_db_ready

View File

@ -39,7 +39,7 @@ sql show databases
print ==> rows: $rows print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[19] != nostrict then if $data(db)[19] != ready then
sleep 100 sleep 100
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
goto check_db_ready goto check_db_ready

View File

@ -31,7 +31,7 @@ sql show databases
print ==> rows: $rows print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[19] != nostrict then if $data(db)[19] != ready then
sleep 100 sleep 100
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
goto check_db_ready goto check_db_ready

View File

@ -372,4 +372,92 @@ if $data25 != 3 then
return -1 return -1
endi endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
return -1
endi
if $data02 != 4 then
print ======$data02
return -1
endi
if $data03 != 14 then
print ======$data03
return -1
endi
if $data04 != 4 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
# return -1
endi
if $data12 != 4 then
print ======$data12
# return -1
endi
if $data13 != 10 then
print ======$data13
# return -1
endi
if $data14 != 3 then
print ======$data14
# return -1
endi
if $data15 != 1 then
print ======$data15
# return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
# return -1
endi
if $data22 != 4 then
print ======$data22
# return -1
endi
if $data23 != 15 then
print ======$data23
# return -1
endi
if $data24 != 4 then
print ======$data24
# return -1
endi
if $data25 != 3 then
print ======$data25
# return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT