Merge remote-tracking branch 'origin/3.0' into feature/shm

This commit is contained in:
Shengliang Guan 2022-03-24 15:25:59 +08:00
commit 94a62fd9de
26 changed files with 529 additions and 329 deletions

View File

@ -63,9 +63,12 @@ typedef enum {
} ETsdbStatisStatus; } ETsdbStatisStatus;
typedef enum { typedef enum {
TSDB_SMA_STAT_OK = 0, // ready to provide service TSDB_SMA_STAT_UNKNOWN = -1, // unknown
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired TSDB_SMA_STAT_OK = 0, // ready to provide service
} ETsdbSmaStat; TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
TSDB_SMA_STAT_DROPPED = 2, // sma dropped
} ETsdbSmaStat; // bit operation
typedef enum { typedef enum {
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
@ -75,8 +78,6 @@ typedef enum {
extern char *qtypeStr[]; extern char *qtypeStr[];
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -477,7 +477,8 @@ typedef struct {
int32_t tz; // query client timezone int32_t tz; // query client timezone
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char offsetUnit; char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
int64_t offset; int64_t offset;

View File

@ -327,7 +327,7 @@ bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
struct SFillColInfo* pFillCol, void* handle); struct SFillColInfo* pFillCol, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);

View File

@ -342,8 +342,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617) #define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0618) #define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0619) #define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
// query // query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)

View File

@ -313,7 +313,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]);
} }
if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { if(pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->delayUnit);
@ -336,12 +336,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
} }
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]);
} }
if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { if(pReq->rollup && pReq->ntbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor);
tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->delayUnit);
tlen += taosEncodeFixedI8(buf, param->nFuncIds); tlen += taosEncodeFixedI8(buf, param->nFuncIds);
@ -424,19 +424,19 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
} }
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
if (pReq->stbCfg.nBSmaCols > 0) { if(pReq->ntbCfg.nBSmaCols > 0) {
pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t));
for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) {
buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i);
} }
} else { } else {
pReq->stbCfg.pBSmaCols = NULL; pReq->ntbCfg.pBSmaCols = NULL;
} }
if (pReq->rollup) { if(pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam; SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
buf = taosDecodeFixedU32(buf, (uint32_t *)&param->xFilesFactor); buf = taosDecodeFixedU32(buf, (uint32_t*)&param->xFilesFactor);
buf = taosDecodeFixedI8(buf, &param->delayUnit); buf = taosDecodeFixedI8(buf, &param->delayUnit);
buf = taosDecodeFixedI8(buf, &param->nFuncIds); buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) { if (param->nFuncIds > 0) {
@ -448,7 +448,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
} }
buf = taosDecodeFixedI64(buf, &param->delay); buf = taosDecodeFixedI64(buf, &param->delay);
} else { } else {
pReq->stbCfg.pRSmaParam = NULL; pReq->ntbCfg.pRSmaParam = NULL;
} }
break; break;
default: default:

View File

@ -51,7 +51,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta); int metaCommit(SMeta *pMeta);
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
int32_t metaDropTSma(SMeta *pMeta, char *indexName); int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid);
// For Query // For Query
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);

View File

@ -96,6 +96,7 @@ int tsdbCommit(STsdb *pTsdb);
*/ */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
/** /**
* @brief Insert RSma(Time-range-wise Rollup SMA) data. * @brief Insert RSma(Time-range-wise Rollup SMA) data.

View File

@ -34,7 +34,7 @@ void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg); int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName); int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
// SMetaCache // SMetaCache
int metaOpenCache(SMeta* pMeta); int metaOpenCache(SMeta* pMeta);

View File

@ -16,15 +16,15 @@
#ifndef _TD_TSDB_SMA_H_ #ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_ #define _TD_TSDB_SMA_H_
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnv SSmaEnv;
struct SSmaEnv { struct SSmaEnv {
TdThreadRwlock lock; TdThreadRwlock lock;
SDiskID did; SDiskID did;
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level? TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
char * path; // relative path char *path; // relative path
SSmaStat * pStat; SSmaStat *pStat;
}; };
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)

View File

@ -259,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
return 0; return 0;
} }
int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) { int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) {
// TODO // TODO
#if 0 #if 0
DBT key = {0}; DBT key = {0};

View File

@ -121,11 +121,11 @@ int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t metaDropTSma(SMeta *pMeta, char* indexName) { int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
// TODO: Validate the cfg // TODO: Validate the cfg
// TODO: add atomicity // TODO: add atomicity
if (metaRemoveSmaFromDb(pMeta, indexName) < 0) { if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) {
// TODO: handle error // TODO: handle error
return -1; return -1;
} }

View File

@ -1377,7 +1377,6 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
} }
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
char* pData = NULL;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
@ -1454,14 +1453,12 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
return numOfRows + num; return numOfRows + num;
} }
// TODO fix bug for reverse copy data // TODO fix bug for reverse copy data problem
// TODO handle the null data
// Note: row1 always has high priority // Note: row1 always has high priority
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1, static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
bool forceSetNull) { bool forceSetNull) {
#if 1 #if 1
char* pData = NULL;
STSchema* pSchema; STSchema* pSchema;
STSRow* row; STSRow* row;
int16_t colId; int16_t colId;
@ -1503,12 +1500,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
int32_t colIdOfRow1; int32_t colIdOfRow1;
if(j >= numOfColsOfRow1) { if(j >= numOfColsOfRow1) {
colIdOfRow1 = INT32_MAX; colIdOfRow1 = INT32_MAX;
@ -1571,43 +1562,11 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
if (colId == pColInfo->info.colId) { if (colId == pColInfo->info.colId) {
if (tdValTypeIsNorm(sVal.valType)) { if (tdValTypeIsNorm(sVal.valType)) {
switch (pColInfo->info.type) { colDataAppend(pColInfo, numOfRows, sVal.val, false);
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, sVal.val, varDataTLen(sVal.val));
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t *)pData = *(uint8_t *)sVal.val;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t *)pData = *(uint16_t *)sVal.val;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t *)pData = *(uint32_t *)sVal.val;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t *)pData = *(uint64_t *)sVal.val;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, sVal.val);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, sVal.val);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
*(TSKEY*)pData = *(TSKEY*)sVal.val;
break;
default:
memcpy(pData, sVal.val, pColInfo->info.bytes);
}
} else if (forceSetNull) { } else if (forceSetNull) {
colDataAppend(pColInfo, numOfRows, NULL, true); colDataAppend(pColInfo, numOfRows, NULL, true);
} }
i++; i++;
if(row == row1) { if(row == row1) {

View File

@ -25,6 +25,7 @@ static const char *TSDB_SMA_DNAME[] = {
#define SMA_STORAGE_TSDB_TIMES 10 #define SMA_STORAGE_TSDB_TIMES 10
#define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
#define SMA_STATE_HASH_SLOT 4 #define SMA_STATE_HASH_SLOT 4
#define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_STATE_ITEM_HASH_SLOT 32
@ -60,10 +61,11 @@ typedef struct {
typedef struct { typedef struct {
/** /**
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service. * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from * - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
* Streaming Module or TSDB local persistence. * Streaming Module or TSDB local persistence.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
*/ */
int8_t state; // ETsdbSmaStat int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: N/A SHashObj *expiredWindows; // key: skey of time window, value: N/A
@ -80,6 +82,7 @@ struct SSmaStat {
// expired window // expired window
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static void * tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
@ -109,7 +112,55 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
// implementation // implementation
static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
if (pStatItem) {
return atomic_load_8(&pStatItem->state);
}
return TSDB_SMA_STAT_UNKNOWN;
}
static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
if(!pStatItem) {
return false;
}
if (state) {
*state = atomic_load_8(&pStatItem->state);
return *state == TSDB_SMA_STAT_OK;
}
return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
}
static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
}
static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) {
return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
}
static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
}
}
static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
}
}
static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
if (pStatItem) {
atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
}
}
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP, snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
TSDB_SMA_DNAME[smaType]); TSDB_SMA_DNAME[smaType]);
@ -252,6 +303,16 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
return pItem; return pItem;
} }
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if (pSmaStatItem != NULL) {
tdDestroyTSma(pSmaStatItem->pSma);
tfree(pSmaStatItem->pSma);
taosHashCleanup(pSmaStatItem->expiredWindows);
tfree(pSmaStatItem);
}
return NULL;
}
/** /**
* @brief Release resources allocated for its member fields, not including itself. * @brief Release resources allocated for its member fields, not including itself.
* *
@ -264,12 +325,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
void *item = taosHashIterate(pSmaStat->smaStatItems, NULL); void *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
while (item != NULL) { while (item != NULL) {
SSmaStatItem *pItem = *(SSmaStatItem **)item; SSmaStatItem *pItem = *(SSmaStatItem **)item;
if (pItem != NULL) { tsdbFreeSmaStatItem(pItem);
tdDestroyTSma(pItem->pSma);
tfree(pItem->pSma);
taosHashCleanup(pItem->expiredWindows);
tfree(pItem);
}
item = taosHashIterate(pSmaStat->smaStatItems, item); item = taosHashIterate(pSmaStat->smaStatItems, item);
} }
taosHashCleanup(pSmaStat->smaStatItems); taosHashCleanup(pSmaStat->smaStatItems);
@ -437,6 +493,15 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind
skey, indexUid); skey, indexUid);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// TODO: use a standalone interface to received state upate notification from stream computing module.
/**
* @brief state
* - When SMA env init in TSDB, its status is TSDB_SMA_STAT_OK.
* - In startup phase of stream computing module, it should notify the SMA env in TSDB to expired if needed(e.g.
* when batch data caculation not finised)
* - When TSDB_SMA_STAT_OK, the stream computing module should also notify that to the SMA env in TSDB.
*/
pItem->state = TSDB_SMA_STAT_OK;
} else { } else {
// error handling // error handling
tsdbUnRefSmaStat(pTsdb, pStat); tsdbUnRefSmaStat(pTsdb, pStat);
@ -711,6 +776,150 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
int64_t indexUid = SMA_TEST_INDEX_UID;
if (pEnv == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return terrno;
}
if (pData->dataLen <= 0) {
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return TSDB_CODE_FAILED;
}
STSmaWriteH tSmaH = {0};
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
return TSDB_CODE_FAILED;
}
SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
SSmaStatItem *pItem = NULL;
tsdbRefSmaStat(pTsdb, pStat);
if (pStat && pStat->smaStatItems) {
pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
}
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
char rPath[TSDB_FILENAME_LEN] = {0};
char aPath[TSDB_FILENAME_LEN] = {0};
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
if (!taosCheckExistFile(aPath)) {
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
}
// Step 1: Judge the storage level and days
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
// TODO:tsdbEndTSmaCommit();
// Step 3: reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_SUCCESS;
}
/**
* @brief Drop tSma data and local cache
* - insert/query reference
* @param pTsdb
* @param msg
* @return int32_t
*/
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
// clear local cache
if (pEnv) {
tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid);
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
if (tsdbSmaStatIsDropped(pItem)) {
tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
}
tsdbWLockSma(pEnv);
if (tsdbSmaStatIsDropped(pItem)) {
tsdbUnLockSma(pEnv);
tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
}
tsdbSmaStatSetDropped(pItem);
tsdbUnLockSma(pEnv);
int32_t nSleep = 0;
while (true) {
if (T_REF_VAL_GET(SMA_ENV_STAT(pEnv)) <= 0) {
break;
}
taosSsleep(1);
if (++nSleep > SMA_DROP_EXPIRED_TIME) {
break;
};
}
tsdbFreeSmaStatItem(pItem);
tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
}
}
// clear sma data files
// TODO:
}
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb;
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
if (pEnv == NULL) { if (pEnv == NULL) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
@ -771,51 +980,6 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb;
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
if (pData->dataLen <= 0) {
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
// Step 1: Judge the storage level
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
tsdbInsertTSmaDataSection(&tSmaH, pData);
// TODO:tsdbEndTSmaCommit();
// reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
return TSDB_CODE_SUCCESS;
}
/** /**
* @brief * @brief
* *
@ -934,6 +1098,15 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
#endif #endif
#if 1 #if 1
int8_t smaStat = 0;
if (!tsdbSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
tsdbWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid,
tstrerror(terrno), smaStat);
return TSDB_CODE_FAILED;
}
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) { if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired. // TODO: mark this window as expired.
tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb), tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
@ -1086,6 +1259,20 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
return code; return code;
} }
/**
* @brief Get tSma data
*
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1094,4 +1281,19 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid,
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
}
/**
* @brief Drop tSma Data and caches
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
} }

View File

@ -202,15 +202,23 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return -1; return -1;
} }
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) {
// TODO: handle error
return -1;
}
// TODO: send msg to stream computing to drop tSma // TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) { // if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq); // tdDestroyTSma(&vCreateSmaReq);
// return -1; // return -1;
// } // }
//
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) {
// TODO: handle error
return -1;
}
// TODO: return directly or go on follow steps? // TODO: return directly or go on follow steps?
#endif #endif
} break; } break;

View File

@ -272,8 +272,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
taosArrayDestroy(pUids); taosArrayDestroy(pUids);
// resource release // resource release
metaRemoveSmaFromDb(pMeta, smaIndexName1); metaRemoveSmaFromDb(pMeta, indexUid1);
metaRemoveSmaFromDb(pMeta, smaIndexName2); metaRemoveSmaFromDb(pMeta, indexUid2);
tdDestroyTSma(&tSma); tdDestroyTSma(&tSma);
metaClose(pMeta); metaClose(pMeta);

View File

@ -76,11 +76,12 @@ typedef struct SResultRowCell {
* If the number of generated results is greater than this value, * If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate. * query query will be halt and return results to client immediate.
*/ */
typedef struct SRspResultInfo { typedef struct SResultInfo { // TODO refactor
int64_t total; // total generated result size in rows int64_t totalRows; // total generated result size in rows
int32_t capacity; // capacity of current result output buffer int64_t totalBytes; // total results in bytes.
int32_t threshold; // result size threshold in rows. int32_t capacity; // capacity of current result output buffer
} SRspResultInfo; int32_t threshold; // result size threshold in rows.
} SResultInfo;
typedef struct SColumnFilterElem { typedef struct SColumnFilterElem {
int16_t bytes; // column length int16_t bytes; // column length
@ -160,8 +161,8 @@ typedef struct STaskCostInfo {
typedef struct SOperatorCostInfo { typedef struct SOperatorCostInfo {
uint64_t openCost; uint64_t openCost;
uint64_t execCost; uint64_t execCost;
uint64_t totalRows; // uint64_t totalRows;
uint64_t totalBytes; // uint64_t totalBytes;
} SOperatorCostInfo; } SOperatorCostInfo;
typedef struct { typedef struct {
@ -301,7 +302,7 @@ typedef struct STaskRuntimeEnv {
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
STableQueryInfo* current; STableQueryInfo* current;
SRspResultInfo resultInfo; SResultInfo resultInfo;
SHashObj* pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
struct SUdfInfo* pUdfInfo; struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv; } STaskRuntimeEnv;
@ -324,7 +325,7 @@ typedef struct SOperatorInfo {
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
@ -539,6 +540,8 @@ typedef struct SFillOperatorInfo {
void** p; void** p;
SSDataBlock* existNewGroupBlock; SSDataBlock* existNewGroupBlock;
bool multigroupResult; bool multigroupResult;
SInterval intervalInfo;
int32_t capacity;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupKeys { typedef struct SGroupKeys {
@ -649,21 +652,20 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo); SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,

View File

@ -248,7 +248,7 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCo
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
@ -7083,52 +7083,54 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, STaskRuntimeEnv* pRuntimeEnv, bool* newgroup) { static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); // taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
*newgroup = true; *newgroup = true;
} }
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRuntimeEnv *pRuntimeEnv, bool *newgroup) { static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false; *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) { if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
return; return;
} }
} }
// handle the cached new group data block // handle the cached new group data block
if (pInfo->existNewGroupBlock) { if (pInfo->existNewGroupBlock) {
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); // doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup);
} }
} }
static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
SFillOperatorInfo *pInfo = pOperator->info; SFillOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
SResultInfo* pResultInfo = &pOperator->resultInfo;
blockDataCleanup(pInfo->pRes);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup);
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes; return pInfo->pRes;
} }
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) { if (*newgroup) {
assert(pBlock != NULL); assert(pBlock != NULL);
@ -7140,7 +7142,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
// Fill the previous group data block, before handle the data block of new group. // Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block // Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); // taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
if (pBlock == NULL) { if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) { if (pInfo->totalInputRows == 0) {
@ -7148,7 +7150,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
return NULL; return NULL;
} }
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); // taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
@ -7156,25 +7158,25 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
} }
} }
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p);
// current group has no more result to return // current group has no more result to return
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue // 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) { if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
return pInfo->pRes; return pInfo->pRes;
} }
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); // doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes; return pInfo->pRes;
} }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL); assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); // doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { if (pInfo->pRes->info.rows > pResultInfo->threshold) {
return pInfo->pRes; return pInfo->pRes;
} }
} else { } else {
@ -7537,8 +7539,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
return 0; return 0;
} }
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
ASSERT(numOfDownstream == 1);
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -7622,7 +7623,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
@ -7647,7 +7648,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pInfo->colIndex = -1; pInfo->colIndex = -1;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
@ -7712,7 +7713,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
@ -7736,7 +7737,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
@ -7827,52 +7828,76 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return NULL; return NULL;
} }
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal);
pInfo->multigroupResult = multigroupResult;
{ TSKEY sk = TMIN(win.skey, win.ekey);
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; TSKEY ek = TMAX(win.skey, win.ekey);
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal);
STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); // TODO set correct time precision
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); STimeWindow w = TSWINDOW_INITIALIZER;
// getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w);
pInfo->pFillInfo = int32_t order = TSDB_ORDER_ASC;
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, pInfo->pFillInfo =
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding,
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id);
pInfo->p = calloc(numOfOutput, POINTER_BYTES); pInfo->p = calloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
} }
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult;
pInfo->intervalInfo = *pInterval;
SResultInfo* pResultInfo = &pOperator->resultInfo;
// int32_t code = initFillInfo(pInfo, pExpr, numOfCols, fillVal, , pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
// if (code != TSDB_CODE_SUCCESS) {
// goto _error;
// }
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill; // pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doFill; pOperator->getNextFn = doFill;
pOperator->closeFn = destroySFillOperatorInfo; pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error:
tfree(pOperator);
tfree(pInfo);
return NULL;
} }
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
// pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); // pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
// pInfo->slimit = pQueryAttr->slimit; // pInfo->slimit = pQueryAttr->slimit;
// pInfo->limit = pQueryAttr->limit; // pInfo->limit = pQueryAttr->limit;
// pInfo->capacity = pRuntimeEnv->resultInfo.capacity; // pInfo->capacity = pResultInfo->capacity;
// pInfo->threshold = (int64_t)(pInfo->capacity * 0.8); // pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
// pInfo->currentOffset = pQueryAttr->limit.offset; // pInfo->currentOffset = pQueryAttr->limit.offset;
// pInfo->currentGroupOffset = pQueryAttr->slimit.offset; // pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
@ -7895,9 +7920,8 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
offset += pExpr[index->colIndex].base.resSchema.bytes; offset += pExpr[index->colIndex].base.resSchema.bytes;
} }
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
// pOperator->operatorType = OP_SLimit; // pOperator->operatorType = OP_SLimit;
@ -7920,7 +7944,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) {
} }
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info; STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes; SSDataBlock *pRes = pInfo->pRes;
@ -8046,7 +8070,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) {
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); // pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1); assert(numOfGroup == 0 || numOfGroup == 1);
@ -8372,7 +8396,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
return pTaskInfo; return pTaskInfo;
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId); static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId);
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
@ -8380,20 +8404,13 @@ static SArray* extractScanColumnId(SNodeList* pNodeList);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
// if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
// pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0);
// }
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
@ -8401,10 +8418,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0}; int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
SArray* tableIdList = extractTableIdList(&groupInfo);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
@ -8597,22 +8612,20 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
return tableIdList; return tableIdList;
} }
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId) {
STableGroupInfo groupInfo = {0};
uint64_t uid = pTableScanNode->scan.uid; uint64_t uid = pTableScanNode->scan.uid;
int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, &groupInfo, queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
if (groupInfo.numOfTables == 0) { if (pTableGroupInfo->numOfTables == 0) {
code = 0; code = 0;
qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId); qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId);
goto _error; goto _error;
} }
return createDataReaderImpl(pTableScanNode, &groupInfo, pHandle->reader, queryId, taskId); return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId);
_error: _error:
terrno = code; terrno = code;
@ -8847,7 +8860,7 @@ static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) {
} }
} }
void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
// the minimum number of rows for projection query // the minimum number of rows for projection query
@ -8868,7 +8881,7 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
} }
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO); pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO);
pResultInfo->total = 0; pResultInfo->totalRows = 0;
} }
//TODO refactor //TODO refactor

View File

@ -61,7 +61,7 @@ typedef struct SFillInfo {
SFillColInfo* pFillCol; // column info for fill operations SFillColInfo* pFillCol; // column info for fill operations
SFillTagColInfo* pTags; // tags value for filling gap SFillTagColInfo* pTags; // tags value for filling gap
void* handle; // for debug purpose const char* id;
} SFillInfo; } SFillInfo;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);

View File

@ -342,7 +342,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
struct SFillColInfo* pCol, void* handle) { struct SFillColInfo* pCol, const char* id) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
return NULL; return NULL;
} }
@ -357,7 +357,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
pFillInfo->numOfCols = numOfCols; pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision; pFillInfo->precision = precision;
pFillInfo->alloc = capacity; pFillInfo->alloc = capacity;
pFillInfo->handle = handle; pFillInfo->id = id;
pFillInfo->interval.interval = slidingTime; pFillInfo->interval.interval = slidingTime;
pFillInfo->interval.intervalUnit = slidingUnit; pFillInfo->interval.intervalUnit = slidingUnit;

View File

@ -338,8 +338,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")

View File

@ -15,10 +15,13 @@
# ---- insert # ---- insert
./test.sh -f tsim/insert/basic0.sim ./test.sh -f tsim/insert/basic0.sim
./test.sh -f tsim/insert/basic1.sim
./test.sh -f tsim/insert/backquote.sim
./test.sh -f tsim/insert/null.sim ./test.sh -f tsim/insert/null.sim
# ---- query # ---- query
./test.sh -f tsim/query/interval.sim ./test.sh -f tsim/query/interval.sim
#./test.sh -f tsim/query/interval-offset.sim
# ---- table # ---- table
./test.sh -f tsim/table/basic1.sim ./test.sh -f tsim/table/basic1.sim

View File

@ -36,8 +36,9 @@ while $dbCnt < 2
$dbCnt = $dbCnt + 1 $dbCnt = $dbCnt + 1
print =============== create super table, include all type print =============== create super table, include all type
sql create table `stable` (`timestamp` timestamp, `int` int, `binary` binary(16), `nchar` nchar(16)) tags (`float` float, `Binary` binary(16), `Nchar` nchar(16)) print notes: after nchar show ok, modify binary to nchar
sql create table `Stable` (`timestamp` timestamp, `int` int, `Binary` binary(32), `Nchar` nchar(32)) tags (`float` float, `binary` binary(16), `nchar` nchar(16)) sql create table `stable` (`timestamp` timestamp, `int` int, `binary` binary(16), `nchar` binary(16)) tags (`float` float, `Binary` binary(16), `Nchar` nchar(16))
sql create table `Stable` (`timestamp` timestamp, `int` int, `Binary` binary(32), `Nchar` binary(32)) tags (`float` float, `binary` binary(16), `nchar` nchar(16))
sql show stables sql show stables
print rows: $rows print rows: $rows
@ -47,10 +48,14 @@ while $dbCnt < 2
return -1 return -1
endi endi
if $data00 != Stable then if $data00 != Stable then
return -1 if $data00 != stable then
return -1
endi
endi endi
if $data10 != stable then if $data10 != Stable then
return -1 if $data10 != stable then
return -1
endi
endi endi
print =============== create child table print =============== create child table
@ -145,26 +150,26 @@ while $dbCnt < 2
return -1 return -1
endi endi
print =============== query data from st, but not support select * from super table, waiting fix #print =============== query data from st, but not support select * from super table, waiting fix
sql select count(*) from `stable` #sql select count(*) from `stable`
print rows: $rows #print rows: $rows
print $data00 $data01 $data02 $data03 #print $data00 $data01 $data02 $data03
if $rows != 1 then #if $rows != 1 then
return -1 # return -1
endi #endi
if $data00 != 4 then #if $data00 != 4 then
return -1 # return -1
endi #endi
sql select count(*) from `Stable` #sql select count(*) from `Stable`
print rows: $rows #print rows: $rows
print $data00 $data01 $data02 $data03 #print $data00 $data01 $data02 $data03
if $rows != 1 then #if $rows != 1 then
return -1 # return -1
endi #endi
if $data00 != 4 then #if $data00 != 4 then
return -1 # return -1
endi #endi
#sql select * from st #sql select * from `stable`
#if $rows != 4 then #if $rows != 4 then
# return -1 # return -1
#endi #endi
@ -178,7 +183,7 @@ system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0 $loop_cnt = 0
check_dnode_ready: check_dnode_ready:
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
sleep 100 sleep 200
if $loop_cnt == 10 then if $loop_cnt == 10 then
print ====> dnode not ready! print ====> dnode not ready!
return -1 return -1
@ -228,10 +233,14 @@ while $dbCnt < 2
return -1 return -1
endi endi
if $data00 != Stable then if $data00 != Stable then
return -1 if $data00 != stable then
return -1
endi
endi endi
if $data10 != stable then if $data10 != Stable then
return -1 if $data10 != stable then
return -1
endi
endi endi
sql show tables sql show tables
@ -313,26 +322,26 @@ while $dbCnt < 2
return -1 return -1
endi endi
print =============== query data from st, but not support select * from super table, waiting fix #print =============== query data from st, but not support select * from super table, waiting fix
sql select count(*) from `stable` #sql select count(*) from `stable`
print rows: $rows #print rows: $rows
print $data00 $data01 $data02 $data03 #print $data00 $data01 $data02 $data03
if $rows != 1 then #if $rows != 1 then
return -1 # return -1
endi #endi
if $data00 != 4 then #if $data00 != 4 then
return -1 # return -1
endi #endi
sql select count(*) from `Stable` #sql select count(*) from `Stable`
print rows: $rows #print rows: $rows
print $data00 $data01 $data02 $data03 #print $data00 $data01 $data02 $data03
if $rows != 1 then #if $rows != 1 then
return -1 # return -1
endi #endi
if $data00 != 4 then #if $data00 != 4 then
return -1 # return -1
endi #endi
#sql select * from st #sql select * from `stable`
#if $rows != 4 then #if $rows != 4 then
# return -1 # return -1
#endi #endi

View File

@ -55,7 +55,8 @@ if $rows != 4 then
return -1 return -1
endi endi
if $data01 != true then if $data01 != 1 then
print expect 1, actual: $data01
return -1 return -1
endi endi
@ -80,7 +81,7 @@ system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0 $loop_cnt = 0
check_dnode_ready: check_dnode_ready:
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
sleep 100 sleep 200
if $loop_cnt == 10 then if $loop_cnt == 10 then
print ====> dnode not ready! print ====> dnode not ready!
return -1 return -1
@ -105,7 +106,7 @@ if $rows != 4 then
return -1 return -1
endi endi
if $data01 != true then if $data01 != 1 then
return -1 return -1
endi endi

View File

@ -9,9 +9,9 @@ run tsim/db/error1.sim
run tsim/dnode/basic1.sim run tsim/dnode/basic1.sim
run tsim/insert/basic0.sim run tsim/insert/basic0.sim
#run tsim/insert/basic1.sim # TD-14246 run tsim/insert/basic1.sim
#run tsim/insert/backquote.sim # TD-14261 run tsim/insert/backquote.sim
#run tsim/insert/null.sim run tsim/insert/null.sim
run tsim/query/interval.sim run tsim/query/interval.sim
#run tsim/query/interval-offset.sim # TD-14266 #run tsim/query/interval-offset.sim # TD-14266

View File

@ -58,7 +58,7 @@ static struct argp_option options[] = {
{"check", 'k', "CHECK", 0, "Check tables."}, {"check", 'k', "CHECK", 0, "Check tables."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."}, {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speed|fqdn."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."}, {"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
// Shuduo: 3.0 does not support UDP any more // Shuduo: 3.0 does not support UDP any more

View File

@ -406,7 +406,7 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p
reqMsg.code = 0; reqMsg.code = 0;
reqMsg.handle = NULL; // rpc handle returned to app reqMsg.handle = NULL; // rpc handle returned to app
reqMsg.ahandle = NULL; // app handle set by client reqMsg.ahandle = NULL; // app handle set by client
strcpy(reqMsg.pCont, "nettest"); strcpy(reqMsg.pCont, "dnode-nettest");
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg); rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
@ -442,7 +442,7 @@ static void taosNetTestStartup(char *host, int32_t port) {
SStartupReq *pStep = malloc(sizeof(SStartupReq)); SStartupReq *pStep = malloc(sizeof(SStartupReq));
while (1) { while (1) {
int32_t code = taosNetCheckRpc(host, port + TSDB_PORT_DNODEDNODE, 20, 0, pStep); int32_t code = taosNetCheckRpc(host, port, 20, 0, pStep);
if (code > 0) { if (code > 0) {
code = taosNetParseStartup(pStep); code = taosNetParseStartup(pStep);
} }
@ -499,48 +499,46 @@ static void taosNetCheckSync(char *host, int32_t port) {
} }
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + TSDB_PORT_SYNC;
char spi = 0; char spi = 0;
uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); uInfo("check rpc, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
for (uint16_t port = startPort; port < endPort; port++) { uint16_t port = startPort;
int32_t sendpkgLen; int32_t sendpkgLen;
if (pkgLen <= tsRpcMaxUdpSize) { if (pkgLen <= tsRpcMaxUdpSize) {
sendpkgLen = tsRpcMaxUdpSize + 1000; sendpkgLen = tsRpcMaxUdpSize + 1000;
} else { } else {
sendpkgLen = pkgLen; sendpkgLen = pkgLen;
}
tsRpcForceTcp = 1;
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test TCP port:%d\n", port);
} else {
printf("successed to test TCP port:%d\n", port);
}
if (pkgLen >= tsRpcMaxUdpSize) {
sendpkgLen = tsRpcMaxUdpSize - 1000;
} else {
sendpkgLen = pkgLen;
}
tsRpcForceTcp = 0;
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test UDP port:%d\n", port);
} else {
printf("successed to test UDP port:%d\n", port);
}
} }
taosNetCheckSync(host, startPort + TSDB_PORT_SYNC); tsRpcForceTcp = 1;
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test TCP port:%d\n", port);
} else {
printf("successed to test TCP port:%d\n", port);
}
if (pkgLen >= tsRpcMaxUdpSize) {
sendpkgLen = tsRpcMaxUdpSize - 1000;
} else {
sendpkgLen = pkgLen;
}
/*
tsRpcForceTcp = 0;
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) {
printf("failed to test UDP port:%d\n", port);
} else {
printf("successed to test UDP port:%d\n", port);
}
*/
taosNetCheckSync(host, startPort);
} }
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) { static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 11; uInfo("work as client, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
uInfo("work as client, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
uint32_t serverIp = taosGetIpv4FromFqdn(host); uint32_t serverIp = taosGetIpv4FromFqdn(host);
if (serverIp == 0xFFFFFFFF) { if (serverIp == 0xFFFFFFFF) {
@ -549,15 +547,14 @@ static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
} }
uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host); uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host);
taosNetCheckPort(serverIp, startPort, endPort, pkgLen); taosNetCheckPort(serverIp, startPort, startPort, pkgLen);
} }
static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) { static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 11; uInfo("work as server, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
uInfo("work as server, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
int32_t port = startPort; int32_t port = startPort;
int32_t num = endPort - startPort + 1; int32_t num = 1;
if (num < 0) num = 1; if (num < 0) num = 1;
TdThread *pids = malloc(2 * num * sizeof(TdThread)); TdThread *pids = malloc(2 * num * sizeof(TdThread));