Merge remote-tracking branch 'origin/3.0' into feature/shm

This commit is contained in:
Shengliang Guan 2022-04-07 10:58:26 +08:00
commit c1a8779544
48 changed files with 724 additions and 440 deletions

View File

@ -110,10 +110,11 @@ typedef struct SFileBlockInfo {
#define FUNCTION_COV 38 #define FUNCTION_COV 38
typedef struct SResultRowEntryInfo { typedef struct SResultRowEntryInfo {
int8_t hasResult; // result generated, not NULL value // int8_t hasResult:6; // result generated, not NULL value
bool initialized; // output buffer has been initialized bool initialized:1; // output buffer has been initialized
bool complete; // query has completed bool complete:1; // query has completed
uint32_t numOfRes; // num of output result in current buffer uint8_t isNullRes:6; // the result is null
uint8_t numOfRes; // num of output result in current buffer
} SResultRowEntryInfo; } SResultRowEntryInfo;
// determine the real data need to calculated the result // determine the real data need to calculated the result
@ -156,7 +157,6 @@ typedef struct SResultDataInfo {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo))) #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
typedef struct SInputColumnInfoData { typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data int32_t totalRows; // total rows in current columnar data

View File

@ -94,6 +94,7 @@ typedef struct SColumnDefNode {
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
SDataType dataType; SDataType dataType;
char comments[TSDB_STB_COMMENT_LEN]; char comments[TSDB_STB_COMMENT_LEN];
bool sma;
} SColumnDefNode; } SColumnDefNode;
typedef struct SCreateTableStmt { typedef struct SCreateTableStmt {

View File

@ -512,6 +512,16 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
setNull((char *)result, type, tDataTypes[type].bytes); setNull((char *)result, type, tDataTypes[type].bytes);
return 0; return 0;
} }
if (IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || (pVariant->nType == TSDB_DATA_TYPE_BOOL)) {
*result = pVariant->i;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pVariant->nType)) {
*result = pVariant->u;
} else if (IS_FLOAT_TYPE(pVariant->nType)) {
*result = (int64_t) pVariant->d;
} else {
//TODO: handling var types
}
#if 0 #if 0
errno = 0; errno = 0;
if (IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || (pVariant->nType == TSDB_DATA_TYPE_BOOL)) { if (IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || (pVariant->nType == TSDB_DATA_TYPE_BOOL)) {
@ -1038,4 +1048,3 @@ char * taosVariantGet(SVariant *pVar, int32_t type) {
return NULL; return NULL;
} }

View File

@ -1,5 +1,5 @@
set(META_DB_IMPL_LIST "BDB" "TDB") set(META_DB_IMPL_LIST "BDB" "TDB")
set(META_DB_IMPL "BDB" CACHE STRING "Use BDB as the default META implementation") set(META_DB_IMPL "TDB" CACHE STRING "Use BDB as the default META implementation")
set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST}) set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST})
if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST) if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST)

View File

@ -17,12 +17,12 @@
#define _TD_TSDB_READ_IMPL_H_ #define _TD_TSDB_READ_IMPL_H_
#include "os.h" #include "os.h"
#include "tcommon.h"
#include "tfs.h" #include "tfs.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFile.h" #include "tsdbFile.h"
#include "tskiplist.h"
#include "tsdbMemory.h" #include "tsdbMemory.h"
#include "tcommon.h" #include "tskiplist.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -31,7 +31,6 @@ extern "C" {
typedef struct SReadH SReadH; typedef struct SReadH SReadH;
typedef struct { typedef struct {
int32_t tid;
uint32_t len; uint32_t len;
uint32_t offset; uint32_t offset;
uint32_t hasLast : 2; uint32_t hasLast : 2;
@ -222,7 +221,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds); int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
int numOfColsIds);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);

View File

@ -1,21 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
int metaCommit(SMeta *pMeta) {
// TODO
return 0;
}

View File

@ -16,8 +16,20 @@
#include "metaDef.h" #include "metaDef.h"
#include "tdbInt.h" #include "tdbInt.h"
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
static SPoolMem *openPool();
static void clearPool(SPoolMem *pPool);
static void closePool(SPoolMem *pPool);
static void *poolMalloc(void *arg, size_t size);
static void poolFree(void *arg, void *ptr);
struct SMetaDB { struct SMetaDB {
TXN txn;
TENV *pEnv; TENV *pEnv;
TDB *pTbDB; TDB *pTbDB;
TDB *pSchemaDB; TDB *pSchemaDB;
@ -25,6 +37,7 @@ struct SMetaDB {
TDB *pStbIdx; TDB *pStbIdx;
TDB *pNtbIdx; TDB *pNtbIdx;
TDB *pCtbIdx; TDB *pCtbIdx;
SPoolMem *pPool;
}; };
typedef struct __attribute__((__packed__)) { typedef struct __attribute__((__packed__)) {
@ -167,12 +180,19 @@ int metaOpenDB(SMeta *pMeta) {
return -1; return -1;
} }
pMetaDb->pPool = openPool();
tdbTxnOpen(&pMetaDb->txn, 0, poolMalloc, poolFree, pMetaDb->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pMetaDb->pEnv, NULL);
pMeta->pDB = pMetaDb; pMeta->pDB = pMetaDb;
return 0; return 0;
} }
void metaCloseDB(SMeta *pMeta) { void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) { if (pMeta->pDB) {
tdbCommit(pMeta->pDB->pEnv, &pMeta->pDB->txn);
tdbTxnClose(&pMeta->pDB->txn);
clearPool(pMeta->pDB->pPool);
tdbDbClose(pMeta->pDB->pCtbIdx); tdbDbClose(pMeta->pDB->pCtbIdx);
tdbDbClose(pMeta->pDB->pNtbIdx); tdbDbClose(pMeta->pDB->pNtbIdx);
tdbDbClose(pMeta->pDB->pStbIdx); tdbDbClose(pMeta->pDB->pStbIdx);
@ -206,13 +226,21 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
uid = metaGenerateUid(pMeta); uid = metaGenerateUid(pMeta);
} }
// check name and uid unique
if (tdbDbGet(pMetaDb->pTbDB, &uid, sizeof(uid), NULL, NULL) == 0) {
return -1;
}
if (tdbDbGet(pMetaDb->pNameIdx, pTbCfg->name, strlen(pTbCfg->name) + 1, NULL, NULL) == 0) {
return -1;
}
// save to table.db // save to table.db
pKey = &uid; pKey = &uid;
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeTbInfo(&pBuf, pTbCfg); metaEncodeTbInfo(&pBuf, pTbCfg);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -226,15 +254,15 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
schemaWrapper.nCols = pTbCfg->stbCfg.nCols; schemaWrapper.nCols = pTbCfg->stbCfg.nCols;
schemaWrapper.pSchema = pTbCfg->stbCfg.pSchema; schemaWrapper.pSchemaEx = pTbCfg->stbCfg.pSchema;
} else { } else {
schemaWrapper.nCols = pTbCfg->ntbCfg.nCols; schemaWrapper.nCols = pTbCfg->ntbCfg.nCols;
schemaWrapper.pSchema = pTbCfg->ntbCfg.pSchema; schemaWrapper.pSchemaEx = pTbCfg->ntbCfg.pSchema;
} }
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeSchemaEx(&pBuf, &schemaWrapper); metaEncodeSchemaEx(&pBuf, &schemaWrapper);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -248,7 +276,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen = nameLen + 1 + sizeof(uid); kLen = nameLen + 1 + sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -259,7 +287,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -270,7 +298,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen = sizeof(ctbIdxKey); kLen = sizeof(ctbIdxKey);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -279,12 +307,16 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen); ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
} }
if (pMeta->pDB->pPool->size > 0) {
metaCommit(pMeta);
}
return 0; return 0;
} }
@ -349,7 +381,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
return *metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false); return metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
} }
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) { static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
@ -523,7 +555,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
return 0; return 0;
} }
pCtbIdxKey = pCtbCur->pVal; pCtbIdxKey = pCtbCur->pKey;
return pCtbIdxKey->uid; return pCtbIdxKey->uid;
} }
@ -701,3 +733,84 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
} }
return buf; return buf;
} }
int metaCommit(SMeta *pMeta) {
TXN *pTxn = &pMeta->pDB->txn;
// Commit current txn
tdbCommit(pMeta->pDB->pEnv, pTxn);
tdbTxnClose(pTxn);
clearPool(pMeta->pDB->pPool);
// start a new txn
tdbTxnOpen(&pMeta->pDB->txn, 0, poolMalloc, poolFree, pMeta->pDB->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pMeta->pDB->pEnv, pTxn);
return 0;
}
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
tdbOsFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
tdbOsFree(pPool);
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
tdbOsFree(pMem);
}

View File

@ -27,5 +27,5 @@ void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
tb_uid_t metaGenerateUid(SMeta *pMeta) { tb_uid_t metaGenerateUid(SMeta *pMeta) {
// Generate a new table UID // Generate a new table UID
return tGenIdPI32(); return tGenIdPI64();
} }

View File

@ -701,7 +701,6 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
// Set pIdx // Set pIdx
pBlock = taosArrayGetLast(pSupA); pBlock = taosArrayGetLast(pSupA);
pIdx->tid = TABLE_TID(pTable);
pIdx->uid = TABLE_UID(pTable); pIdx->uid = TABLE_UID(pTable);
pIdx->hasLast = pBlock->last ? 1 : 0; pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->keyLast; pIdx->maxKey = pBlock->keyLast;

View File

@ -149,8 +149,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
} }
tsize++; tsize++;
ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid < // ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid); // ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
} }
return 0; return 0;
@ -180,7 +180,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
} }
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx); SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
if (pBlkIdx->tid == TABLE_TID(pTable)) { if (pBlkIdx->uid == TABLE_TID(pTable)) {
if (pBlkIdx->uid == TABLE_UID(pTable)) { if (pBlkIdx->uid == TABLE_UID(pTable)) {
pReadh->pBlkIdx = pBlkIdx; pReadh->pBlkIdx = pBlkIdx;
} else { } else {
@ -188,7 +188,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
} }
pReadh->cidx++; pReadh->cidx++;
break; break;
} else if (pBlkIdx->tid > TABLE_TID(pTable)) { } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
pReadh->pBlkIdx = NULL; pReadh->pBlkIdx = NULL;
break; break;
} else { } else {
@ -237,7 +237,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
return -1; return -1;
} }
ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid); // ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
if (pTarget) { if (pTarget) {
memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len); memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
@ -275,7 +275,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return 0; return 0;
} }
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds) { int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
int numOfColsIds) {
ASSERT(pBlock->numOfSubBlocks > 0); ASSERT(pBlock->numOfSubBlocks > 0);
int8_t update = pReadh->pRepo->config.update; int8_t update = pReadh->pRepo->config.update;
@ -388,7 +389,7 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) { int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeVariantI32(buf, pIdx->tid); // tlen += taosEncodeVariantI32(buf, pIdx->tid);
tlen += taosEncodeVariantU32(buf, pIdx->len); tlen += taosEncodeVariantU32(buf, pIdx->len);
tlen += taosEncodeVariantU32(buf, pIdx->offset); tlen += taosEncodeVariantU32(buf, pIdx->offset);
tlen += taosEncodeFixedU8(buf, pIdx->hasLast); tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
@ -404,7 +405,7 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
uint32_t numOfBlocks = 0; uint32_t numOfBlocks = 0;
uint64_t value = 0; uint64_t value = 0;
if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL; // if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;

View File

@ -47,7 +47,7 @@ int vnodeSyncCommit(SVnode *pVnode) {
static int vnodeCommit(void *arg) { static int vnodeCommit(void *arg) {
SVnode *pVnode = (SVnode *)arg; SVnode *pVnode = (SVnode *)arg;
metaCommit(pVnode->pMeta); // metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq); tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTsdb); tsdbCommit(pVnode->pTsdb);

View File

@ -483,6 +483,7 @@ typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not bool timeWindowInterpo; // interpolation needed or not
char **pRow; // previous row/tuple of already processed datablock char **pRow; // previous row/tuple of already processed datablock
@ -670,7 +671,7 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,

View File

@ -297,6 +297,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
idata.info.slotId = pDescNode->slotId; idata.info.slotId = pDescNode->slotId;
idata.info.precision = pDescNode->dataType.precision; idata.info.precision = pDescNode->dataType.precision;
if (IS_VAR_DATA_TYPE(idata.info.type)) {
pBlock->info.hasVarCol = true;
}
taosArrayPush(pBlock->pDataBlock, &idata); taosArrayPush(pBlock->pDataBlock, &idata);
} }
@ -976,7 +980,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey; pCtx[k].startTs = pWin->skey;
// keep it temporarialy // keep it temporarily
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
@ -1008,7 +1012,6 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out); pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1; pEntryInfo->numOfRes = 1;
pEntryInfo->hasResult = ',';
continue; continue;
} }
@ -1260,6 +1263,21 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows); colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
pResult->info.rows = pCtx[0].input.numOfRows; pResult->info.rows = pCtx[0].input.numOfRows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
SVariant *pVal = pExpr->pExpr->pVal;
char *payload;
if (IS_VAR_DATA_TYPE(pVal->nType)) {
payload = taosMemoryCalloc(1, pVal->nLen + VARSTR_HEADER_SIZE);
} else {
payload = taosMemoryCalloc(1, tDataTypes[pVal->nType].bytes);
}
taosVariantDump(pVal, payload, pVal->nType, true);
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, payload, false);
}
taosMemoryFree(payload);
pResult->info.rows = pSrcBlock->info.rows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pBlockList, &pSrcBlock); taosArrayPush(pBlockList, &pSrcBlock);
@ -1507,7 +1525,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
// assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] == // assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] ==
// pSDataBlock->info.window.ekey); // pSDataBlock->info.window.ekey);
@ -3140,11 +3158,6 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
} }
releaseBufPage(pBuf, bufPage); releaseBufPage(pBuf, bufPage);
/*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
} }
} }
@ -3281,6 +3294,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
} }
SFilterInfo* filter = NULL; SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
@ -3292,6 +3306,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
SSDataBlock* px = createOneDataBlock(pBlock); SSDataBlock* px = createOneDataBlock(pBlock);
blockDataEnsureCapacity(px, pBlock->info.rows); blockDataEnsureCapacity(px, pBlock->info.rows);
// todo extract method
int32_t numOfRow = 0; int32_t numOfRow = 0;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
@ -3303,7 +3318,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
continue; continue;
} }
if (colDataIsNull_s(pSrc, j)) {
colDataAppendNULL(pDst, numOfRow);
} else {
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
}
numOfRow += 1; numOfRow += 1;
} }
@ -3521,7 +3540,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset); SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
char* in = GET_ROWCELL_INTERBUF(pEntryInfo); char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0); colDataAppend(pColInfoData, nrows, in, pEntryInfo->isNullRes);
} }
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
@ -6351,7 +6370,7 @@ _error:
} }
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -6367,6 +6386,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->win.skey = 0; pInfo->win.skey = 0;
pInfo->win.ekey = INT64_MAX; pInfo->win.ekey = INT64_MAX;
pInfo->primaryTsIndex = primaryTsSlot;
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
@ -6954,6 +6975,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
SValueNode* pValueNode = (SValueNode*)pTargetNode->pExpr;
SDataType* pType = &pValueNode->node.resType;
char *pDatum = nodesGetValueFromNode(pValueNode);
if (IS_VAR_DATA_TYPE(pType->type)) {
pDatum = varDataVal(pDatum);
}
pExp->pExpr->pVal = taosMemoryCalloc(1, sizeof(SVariant));
taosVariantCreateFromBinary(pExp->pExpr->pVal, pDatum, pType->bytes, pType->type);
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
@ -7133,18 +7164,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
// todo: set the correct primary timestamp key column
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, SInterval interval = {
.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding, .sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit, .intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit, .slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset, .offset = pIntervalPhyNode->offset,
.precision = TSDB_TIME_PRECISION_MILLI}; .precision = pIntervalPhyNode->precision
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); };
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);

View File

@ -106,7 +106,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
return true; return true;
} }
static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
@ -131,7 +131,7 @@ static void keepGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int3
} }
} }
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) { static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
ASSERT(pKey != NULL); ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
@ -170,11 +170,12 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo); char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
char* data = colDataGetData(pColInfoData, rowIndex); char* data = colDataGetData(pColInfoData, rowIndex);
// set result exists, todo refactor
memcpy(dest, data, pColInfoData->info.bytes); memcpy(dest, data, pColInfoData->info.bytes);
pEntryInfo->hasResult = DATA_SET_FLAG; } else { // it is a NULL value
pEntryInfo->numOfRes = 1; pEntryInfo->isNullRes = 1;
} }
pEntryInfo->numOfRes = 1;
} }
} }
} }
@ -197,7 +198,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical. // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) { if (!pInfo->isInit) {
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
pInfo->isInit = true; pInfo->isInit = true;
num++; num++;
continue; continue;
@ -209,7 +210,14 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
continue; continue;
} }
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); // The first row of a new block does not belongs to the previous existed group
if (!equal && j == 0) {
num++;
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
continue;
}
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
@ -220,12 +228,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
num = 1; num = 1;
} }
if (num > 0) { if (num > 0) {
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
int32_t ret = int32_t ret =
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
@ -288,8 +296,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
while(1) { while(1) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.rowCellInfoOffset);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);

View File

@ -25,7 +25,6 @@
break; \ break; \
} \ } \
(_info)->numOfRes = (res); \ (_info)->numOfRes = (res); \
(_info)->hasResult = DATA_SET_FLAG; \
} while (0) } while (0)
typedef struct SSumRes { typedef struct SSumRes {
@ -49,11 +48,11 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
return true; return true;
} }
static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); }
void functionFinalize(SqlFunctionCtx *pCtx) { void functionFinalize(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
doFinalizer(pResInfo);
cleanupResultRowEntry(pResInfo);
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
} }
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
@ -715,7 +714,6 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
} }
SET_VAL(pResInfo, notNullElems, 1); SET_VAL(pResInfo, notNullElems, 1);
pResInfo->hasResult = DATA_SET_FLAG;
} }
// TODO set the correct parameter. // TODO set the correct parameter.
@ -775,9 +773,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
// DO_UPDATE_TAG_COLUMNS(pCtx, k); // DO_UPDATE_TAG_COLUMNS(pCtx, k);
// } // }
pResInfo->hasResult = DATA_SET_FLAG;
pResInfo->complete = true; pResInfo->complete = true;
numOfElems++; numOfElems++;
break; break;
} }
@ -815,8 +811,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; // TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->hasResult = DATA_SET_FLAG;
pResInfo->complete = true; // set query completed on this column pResInfo->complete = true; // set query completed on this column
numOfElems++; numOfElems++;
break; break;
@ -830,10 +824,8 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { if (pResInfo->numOfRes == 0 || (*(TSKEY*)buf) < ts) {
pResInfo->hasResult = DATA_SET_FLAG;
memcpy(buf, data, pCtx->inputBytes); memcpy(buf, data, pCtx->inputBytes);
*(TSKEY*)buf = ts; *(TSKEY*)buf = ts;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts); // DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }

View File

@ -208,7 +208,7 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]); SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
if (!pResInfo->hasResult) { if (pResInfo->numOfRes == 0) {
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) { for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
colDataAppend(pCol, j, NULL, true); // TODO add set null data api colDataAppend(pCol, j, NULL, true); // TODO add set null data api
} }

View File

@ -1050,6 +1050,7 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDat
if (NULL != pComment) { if (NULL != pComment) {
trimString(pComment->z, pComment->n, pCol->comments, sizeof(pCol->comments)); trimString(pComment->z, pComment->n, pCol->comments, sizeof(pCol->comments));
} }
pCol->sma = true;
return (SNode*)pCol; return (SNode*)pCol;
} }

View File

@ -90,14 +90,91 @@ static EDealRes calcConst(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static bool isCondition(const SNode* pNode) {
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
return nodesIsComparisonOp((const SOperatorNode*)pNode);
}
return (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode));
}
static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
SOperatorNode* pOp = nodesMakeNode(QUERY_NODE_OPERATOR);
if (NULL == pOp) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pOp->opType = OP_TYPE_IS_TRUE;
pOp->pLeft = pSrc;
*pIsTrue = (SNode*)pOp;
return TSDB_CODE_SUCCESS;
}
static EDealRes doRewriteCondition(SNode** pNode, void* pContext) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
SNode* pParam = NULL;
FOREACH(pParam, ((SLogicConditionNode*)*pNode)->pParameterList) {
if (!isCondition(pParam)) {
SNode* pIsTrue = NULL;
if (TSDB_CODE_SUCCESS != rewriteIsTrue(pParam, &pIsTrue)) {
((SCalcConstContext*)pContext)->code = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
REPLACE_NODE(pIsTrue);
}
}
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteCondition(SCalcConstContext* pCxt, SNode** pNode) {
if (!isCondition(*pNode)) {
return rewriteIsTrue(*pNode, pNode);
}
nodesRewriteExprPostOrder(pNode, doRewriteCondition, pCxt);
return pCxt->code;
}
static int32_t rewriteConditionForFromTable(SCalcConstContext* pCxt, SNode* pTable) {
if (QUERY_NODE_JOIN_TABLE == nodeType(pTable)) {
SJoinTableNode* pJoin = (SJoinTableNode*)pTable;
pCxt->code = rewriteConditionForFromTable(pCxt, pJoin->pLeft);
if (TSDB_CODE_SUCCESS == pCxt->code) {
pCxt->code = rewriteConditionForFromTable(pCxt, pJoin->pRight);
}
if (TSDB_CODE_SUCCESS == pCxt->code) {
pCxt->code = rewriteCondition(pCxt, &pJoin->pOnCond);
}
}
return pCxt->code;
}
static int32_t calcConstFromTable(SCalcConstContext* pCxt, SSelectStmt* pSelect) {
nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, pCxt);
if (TSDB_CODE_SUCCESS == pCxt->code) {
pCxt->code = rewriteConditionForFromTable(pCxt, pSelect->pFromTable);
}
return pCxt->code;
}
static int32_t calcConstCondition(SCalcConstContext* pCxt, SNode** pCond) {
if (NULL == *pCond) {
return TSDB_CODE_SUCCESS;
}
nodesRewriteExprPostOrder(pCond, calcConst, pCxt);
if (TSDB_CODE_SUCCESS == pCxt->code) {
pCxt->code = rewriteCondition(pCxt, pCond);
}
return pCxt->code;
}
static int32_t calcConstSelect(SSelectStmt* pSelect) { static int32_t calcConstSelect(SSelectStmt* pSelect) {
SCalcConstContext cxt = { .code = TSDB_CODE_SUCCESS }; SCalcConstContext cxt = { .code = TSDB_CODE_SUCCESS };
nodesRewriteExprsPostOrder(pSelect->pProjectionList, calcConst, &cxt); nodesRewriteExprsPostOrder(pSelect->pProjectionList, calcConst, &cxt);
if (TSDB_CODE_SUCCESS == cxt.code) { if (TSDB_CODE_SUCCESS == cxt.code) {
nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, &cxt); cxt.code = calcConstFromTable(&cxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == cxt.code) { if (TSDB_CODE_SUCCESS == cxt.code) {
nodesRewriteExprPostOrder(&pSelect->pWhere, calcConst, &cxt); cxt.code = calcConstCondition(&cxt, &pSelect->pWhere);
} }
if (TSDB_CODE_SUCCESS == cxt.code) { if (TSDB_CODE_SUCCESS == cxt.code) {
nodesRewriteExprsPostOrder(pSelect->pPartitionByList, calcConst, &cxt); nodesRewriteExprsPostOrder(pSelect->pPartitionByList, calcConst, &cxt);
@ -109,7 +186,7 @@ static int32_t calcConstSelect(SSelectStmt* pSelect) {
nodesRewriteExprsPostOrder(pSelect->pGroupByList, calcConst, &cxt); nodesRewriteExprsPostOrder(pSelect->pGroupByList, calcConst, &cxt);
} }
if (TSDB_CODE_SUCCESS == cxt.code) { if (TSDB_CODE_SUCCESS == cxt.code) {
nodesRewriteExprPostOrder(&pSelect->pHaving, calcConst, &cxt); cxt.code = calcConstCondition(&cxt, &pSelect->pHaving);
} }
if (TSDB_CODE_SUCCESS == cxt.code) { if (TSDB_CODE_SUCCESS == cxt.code) {
nodesRewriteExprsPostOrder(pSelect->pOrderByList, calcConst, &cxt); nodesRewriteExprsPostOrder(pSelect->pOrderByList, calcConst, &cxt);

View File

@ -1119,7 +1119,7 @@ static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static const SColumnDefNode* findColDef(const SNodeList* pCols, const SColumnNode* pCol) { static SColumnDefNode* findColDef(SNodeList* pCols, const SColumnNode* pCol) {
SNode* pColDef = NULL; SNode* pColDef = NULL;
FOREACH(pColDef, pCols) { FOREACH(pColDef, pCols) {
if (0 == strcmp(pCol->colName, ((SColumnDefNode*)pColDef)->colName)) { if (0 == strcmp(pCol->colName, ((SColumnDefNode*)pColDef)->colName)) {
@ -1129,16 +1129,20 @@ static const SColumnDefNode* findColDef(const SNodeList* pCols, const SColumnNod
return NULL; return NULL;
} }
static int32_t checkCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
if (NULL != pStmt->pOptions->pSma) { if (NULL != pStmt->pOptions->pSma) {
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, pStmt->pCols) {
((SColumnDefNode*)pNode)->sma = false;
}
FOREACH(pNode, pStmt->pOptions->pSma) { FOREACH(pNode, pStmt->pOptions->pSma) {
SColumnNode* pSmaCol = (SColumnNode*)pNode; SColumnNode* pSmaCol = (SColumnNode*)pNode;
const SColumnDefNode* pColDef = findColDef(pStmt->pCols, pSmaCol); SColumnDefNode* pColDef = findColDef(pStmt->pCols, pSmaCol);
if (NULL == pColDef) { if (NULL == pColDef) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pSmaCol->colName); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pSmaCol->colName);
} }
pSmaCol->node.resType = pColDef->dataType; pSmaCol->node.resType = pColDef->dataType;
pColDef->sma = true;
} }
} }
if (NULL != pStmt->pOptions->pFuncs) { if (NULL != pStmt->pOptions->pFuncs) {
@ -1158,7 +1162,7 @@ static int32_t getAggregationMethod(SNodeList* pFuncs) {
} }
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
int32_t code = checkCreateSuperTable(pCxt, pStmt); int32_t code = checkCreateTable(pCxt, pStmt);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
@ -1489,7 +1493,7 @@ static int32_t nodeTypeToShowType(ENodeType nt) {
case QUERY_NODE_SHOW_CONNECTIONS_STMT: case QUERY_NODE_SHOW_CONNECTIONS_STMT:
return TSDB_MGMT_TABLE_CONNS; return TSDB_MGMT_TABLE_CONNS;
case QUERY_NODE_SHOW_LICENCE_STMT: case QUERY_NODE_SHOW_LICENCE_STMT:
return 0; // todo return TSDB_MGMT_TABLE_GRANTS;
case QUERY_NODE_SHOW_QUERIES_STMT: case QUERY_NODE_SHOW_QUERIES_STMT:
return TSDB_MGMT_TABLE_QUERIES; return TSDB_MGMT_TABLE_QUERIES;
case QUERY_NODE_SHOW_SCORES_STMT: case QUERY_NODE_SHOW_SCORES_STMT:
@ -2161,11 +2165,11 @@ typedef struct SVgroupTablesBatch {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch; } SVgroupTablesBatch;
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) { static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) {
pSchema->colId = colId; pSchema->colId = colId;
pSchema->type = pCol->dataType.type; pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType); pSchema->bytes = calcTypeBytes(pCol->dataType);
pSchema->sma = TSDB_BSMA_TYPE_LATEST; // TODO: use default value currently, and use the real value later. pSchema->sma = pCol->sma ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
strcpy(pSchema->name, pCol->colName); strcpy(pSchema->name, pCol->colName);
} }
@ -2175,33 +2179,60 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) {
taosMemoryFreeClear(pReq->ntbCfg.pSchema); taosMemoryFreeClear(pReq->ntbCfg.pSchema);
} }
static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, const char* pTableName, static int32_t buildSmaParam(STableOptions* pOptions, SVCreateTbReq* pReq) {
const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) { if (0 == LIST_LENGTH(pOptions->pFuncs)) {
return TSDB_CODE_SUCCESS;
}
pReq->ntbCfg.pRSmaParam = taosMemoryCalloc(1, sizeof(SRSmaParam));
if (NULL == pReq->ntbCfg.pRSmaParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->ntbCfg.pRSmaParam->delay = pOptions->delay;
pReq->ntbCfg.pRSmaParam->xFilesFactor = pOptions->filesFactor;
pReq->ntbCfg.pRSmaParam->nFuncIds = LIST_LENGTH(pOptions->pFuncs);
pReq->ntbCfg.pRSmaParam->pFuncIds = taosMemoryCalloc(pReq->ntbCfg.pRSmaParam->nFuncIds, sizeof(func_id_t));
if (NULL == pReq->ntbCfg.pRSmaParam->pFuncIds) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t index = 0;
SNode* pFunc = NULL;
FOREACH(pFunc, pOptions->pFuncs) {
pReq->ntbCfg.pRSmaParam->pFuncIds[index++] = ((SFunctionNode*)pFunc)->funcId;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId }; SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId };
strcpy(name.dbname, pDbName); strcpy(name.dbname, pStmt->dbName);
tNameGetFullDbName(&name, dbFName); tNameGetFullDbName(&name, dbFName);
SVCreateTbReq req = {0}; SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE; req.type = TD_NORMAL_TABLE;
req.dbFName = strdup(dbFName); req.dbFName = strdup(dbFName);
req.name = strdup(pTableName); req.name = strdup(pStmt->tableName);
req.ntbCfg.nCols = LIST_LENGTH(pColumns); req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx)); req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx));
if (NULL == req.name || NULL == req.ntbCfg.pSchema) { if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
destroyCreateTbReq(&req); destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SNode* pCol; SNode* pCol;
int32_t index = 0; col_id_t index = 0;
FOREACH(pCol, pColumns) { FOREACH(pCol, pStmt->pCols) {
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index); toSchemaEx((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
++index; ++index;
} }
// TODO: use the real sma for normal table. if (TSDB_CODE_SUCCESS != buildSmaParam(pStmt->pOptions, &req)) {
destroyCreateTbReq(&req);
return TSDB_CODE_OUT_OF_MEMORY;
}
pBatch->info = *pVgroupInfo; pBatch->info = *pVgroupInfo;
strcpy(pBatch->dbName, pDbName); strcpy(pBatch->dbName, pStmt->dbName);
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == pBatch->req.pArray) { if (NULL == pBatch->req.pArray) {
destroyCreateTbReq(&req); destroyCreateTbReq(&req);
@ -2282,7 +2313,7 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
} }
SVgroupTablesBatch tbatch = {0}; SVgroupTablesBatch tbatch = {0};
int32_t code = buildNormalTableBatchReq(acctId, pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch); int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = serializeVgroupTablesBatch(&tbatch, *pBufArray); code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
} }
@ -2297,8 +2328,11 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot; SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
int32_t code = checkCreateTable(pCxt, pStmt);
SVgroupInfo info = {0}; SVgroupInfo info = {0};
int32_t code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info); if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
}
SArray* pBufArray = NULL; SArray* pBufArray = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray); code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);

View File

@ -44,7 +44,7 @@ protected:
query_ = nullptr; query_ = nullptr;
bool res = runImpl(parseCode, translateCode); bool res = runImpl(parseCode, translateCode);
qDestroyQuery(query_); qDestroyQuery(query_);
if (!res) { if (1/*!res*/) {
dump(); dump();
} }
return res; return res;
@ -69,6 +69,12 @@ private:
return (terrno == translateCode); return (terrno == translateCode);
} }
translatedAstStr_ = toString(query_->pRoot); translatedAstStr_ = toString(query_->pRoot);
code = calculateConstant(&cxt_, query_);
if (code != TSDB_CODE_SUCCESS) {
calcConstErrStr_ = string("code:") + tstrerror(code) + string(", msg:") + errMagBuf_;
return false;
}
calcConstAstStr_ = toString(query_->pRoot);
return (TSDB_CODE_SUCCESS == translateCode); return (TSDB_CODE_SUCCESS == translateCode);
} }
@ -88,6 +94,13 @@ private:
cout << "translate output: " << endl; cout << "translate output: " << endl;
cout << translatedAstStr_ << endl; cout << translatedAstStr_ << endl;
} }
if (!calcConstErrStr_.empty()) {
cout << "calculateConstant error: " << calcConstErrStr_ << endl;
}
if (!calcConstAstStr_.empty()) {
cout << "calculateConstant output: " << endl;
cout << calcConstAstStr_ << endl;
}
} }
string toString(const SNode* pRoot, bool format = false) { string toString(const SNode* pRoot, bool format = false) {
@ -112,6 +125,8 @@ private:
parsedAstStr_.clear(); parsedAstStr_.clear();
translateErrStr_.clear(); translateErrStr_.clear();
translatedAstStr_.clear(); translatedAstStr_.clear();
calcConstErrStr_.clear();
calcConstAstStr_.clear();
} }
string acctId_; string acctId_;
@ -124,6 +139,8 @@ private:
string parsedAstStr_; string parsedAstStr_;
string translateErrStr_; string translateErrStr_;
string translatedAstStr_; string translatedAstStr_;
string calcConstErrStr_;
string calcConstAstStr_;
}; };
TEST_F(ParserTest, createAccount) { TEST_F(ParserTest, createAccount) {
@ -191,6 +208,9 @@ TEST_F(ParserTest, selectConstant) {
bind("SELECT 1234567890123456789012345678901234567890, 20.1234567890123456789012345678901234567890, 'abc', \"wxy\", TIMESTAMP '2022-02-09 17:30:20', true, false, 15s FROM t1"); bind("SELECT 1234567890123456789012345678901234567890, 20.1234567890123456789012345678901234567890, 'abc', \"wxy\", TIMESTAMP '2022-02-09 17:30:20', true, false, 15s FROM t1");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
bind("SELECT 123 + 45 FROM t1 where 2 - 1");
ASSERT_TRUE(run());
} }
TEST_F(ParserTest, selectExpression) { TEST_F(ParserTest, selectExpression) {

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,27 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TDB_BTREE_INT_H_
#define _TDB_BTREE_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TDB_BTREE_INT_H_*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -127,7 +127,7 @@ int tdbBtreeClose(SBTree *pBt) {
return 0; return 0;
} }
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen) { int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
SBTC btc; SBTC btc;
SCell *pCell; SCell *pCell;
void *pBuf; void *pBuf;
@ -137,7 +137,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
int idx; int idx;
int c; int c;
tdbBtcOpen(&btc, pBt); tdbBtcOpen(&btc, pBt, pTxn);
// move to the position to insert // move to the position to insert
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c); ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
@ -225,7 +225,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
void *pTVal = NULL; void *pTVal = NULL;
SCellDecoder cd; SCellDecoder cd;
tdbBtcOpen(&btc, pBt); tdbBtcOpen(&btc, pBt, NULL);
ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret); ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (ret < 0) { if (ret < 0) {
@ -233,7 +233,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
ASSERT(0); ASSERT(0);
} }
if (cret) { if (btc.idx < 0 || cret) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
return -1; return -1;
} }
@ -253,6 +253,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
memcpy(*ppKey, cd.pKey, cd.kLen); memcpy(*ppKey, cd.pKey, cd.kLen);
} }
if (ppVal) {
pTVal = TDB_REALLOC(*ppVal, cd.vLen); pTVal = TDB_REALLOC(*ppVal, cd.vLen);
if (pTVal == NULL) { if (pTVal == NULL) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
@ -262,6 +263,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
*ppVal = pTVal; *ppVal = pTVal;
*vLen = cd.vLen; *vLen = cd.vLen;
memcpy(*ppVal, cd.pVal, cd.vLen); memcpy(*ppVal, cd.pVal, cd.vLen);
}
tdbBtcClose(&btc); tdbBtcClose(&btc);
@ -297,7 +299,8 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
{ {
// 1. TODO: Search the main DB to check if the DB exists // 1. TODO: Search the main DB to check if the DB exists
pgno = 0; ret = tdbPagerOpenDB(pBt->pPager, &pgno, true);
ASSERT(ret == 0);
} }
if (pgno != 0) { if (pgno != 0) {
@ -307,13 +310,13 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
// Try to create a new database // Try to create a new database
SBtreeInitPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt}; SBtreeInitPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt};
ret = tdbPagerNewPage(pBt->pPager, &pgno, &pPage, tdbBtreeZeroPage, &zArg); ret = tdbPagerNewPage(pBt->pPager, &pgno, &pPage, tdbBtreeZeroPage, &zArg, NULL);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
// TODO: here still has problem // TODO: here still has problem
tdbPagerReturnPage(pBt->pPager, pPage); tdbPagerReturnPage(pBt->pPager, pPage, NULL);
ASSERT(pgno != 0); ASSERT(pgno != 0);
pBt->root = pgno; pBt->root = pgno;
@ -385,7 +388,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
} }
// TDB_BTREE_BALANCE ===================== // TDB_BTREE_BALANCE =====================
static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN *pTxn) {
SPager *pPager; SPager *pPager;
SPage *pChild; SPage *pChild;
SPgno pgnoChild; SPgno pgnoChild;
@ -402,7 +405,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
// Allocate a new child page // Allocate a new child page
zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT); zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT);
zArg.pBt = pBt; zArg.pBt = pBt;
ret = tdbPagerNewPage(pPager, &pgnoChild, &pChild, tdbBtreeZeroPage, &zArg); ret = tdbPagerNewPage(pPager, &pgnoChild, &pChild, tdbBtreeZeroPage, &zArg, pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -436,7 +439,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
return 0; return 0;
} }
static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTxn) {
int ret; int ret;
int nOlds; int nOlds;
@ -477,7 +480,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
pgno = *(SPgno *)pCell; pgno = *(SPgno *)pCell;
} }
ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -640,7 +643,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
} else { } else {
iarg.pBt = pBt; iarg.pBt = pBt;
iarg.flags = flags; iarg.flags = flags;
ret = tdbPagerNewPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeZeroPage, &iarg); ret = tdbPagerNewPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeZeroPage, &iarg, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
} }
@ -767,9 +770,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
// TODO: here is not corrent for drop case // TODO: here is not corrent for drop case
for (int i = 0; i < nNews; i++) { for (int i = 0; i < nNews; i++) {
if (i < nOlds) { if (i < nOlds) {
tdbPagerReturnPage(pBt->pPager, pOlds[i]); tdbPagerReturnPage(pBt->pPager, pOlds[i], pTxn);
} else { } else {
tdbPagerReturnPage(pBt->pPager, pNews[i]); tdbPagerReturnPage(pBt->pPager, pNews[i], pTxn);
} }
} }
@ -805,7 +808,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
// ignore the case of empty // ignore the case of empty
if (pPage->nOverflow == 0) break; if (pPage->nOverflow == 0) break;
ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1])); ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1]), pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -819,12 +822,12 @@ static int tdbBtreeBalance(SBTC *pBtc) {
// Generalized balance step // Generalized balance step
pParent = pBtc->pgStack[iPage - 1]; pParent = pBtc->pgStack[iPage - 1];
ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1]); ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1], pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
pBtc->iPage--; pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage]; pBtc->pPage = pBtc->pgStack[pBtc->iPage];
@ -1024,11 +1027,12 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
// TDB_BTREE_CELL // TDB_BTREE_CELL
// TDB_BTREE_CURSOR ===================== // TDB_BTREE_CURSOR =====================
int tdbBtcOpen(SBTC *pBtc, SBTree *pBt) { int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
pBtc->pBt = pBt; pBtc->pBt = pBt;
pBtc->iPage = -1; pBtc->iPage = -1;
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
pBtc->pTxn = pTxn;
return 0; return 0;
} }
@ -1045,7 +1049,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move a clean cursor // move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -1110,7 +1114,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move a clean cursor // move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -1284,7 +1288,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt); ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -1296,7 +1300,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
static int tdbBtcMoveUpward(SBTC *pBtc) { static int tdbBtcMoveUpward(SBTC *pBtc) {
if (pBtc->iPage == 0) return -1; if (pBtc->iPage == 0) return -1;
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
pBtc->iPage--; pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage]; pBtc->pPage = pBtc->pgStack[pBtc->iPage];
@ -1319,7 +1323,7 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move from a clear cursor // move from a clear cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
ASSERT(0); ASSERT(0);
@ -1456,7 +1460,7 @@ int tdbBtcClose(SBTC *pBtc) {
for (;;) { for (;;) {
ASSERT(pBtc->pPage); ASSERT(pBtc->pPage);
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
pBtc->iPage--; pBtc->iPage--;
if (pBtc->iPage < 0) break; if (pBtc->iPage < 0) break;

View File

@ -75,8 +75,8 @@ int tdbDbDrop(TDB *pDb) {
return 0; return 0;
} }
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) { int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen); return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
} }
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
@ -97,7 +97,7 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) {
return -1; return -1;
} }
tdbBtcOpen(&pDbc->btc, pDb->pBt); tdbBtcOpen(&pDbc->btc, pDb->pBt, NULL);
// TODO: move to first now, we can move to any key-value // TODO: move to first now, we can move to any key-value
// and in any direction, design new APIs. // and in any direction, design new APIs.

View File

@ -73,12 +73,12 @@ int tdbEnvClose(TENV *pEnv) {
return 0; return 0;
} }
int tdbBegin(TENV *pEnv) { int tdbBegin(TENV *pEnv, TXN *pTxn) {
SPager *pPager; SPager *pPager;
int ret; int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager); ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -88,12 +88,12 @@ int tdbBegin(TENV *pEnv) {
return 0; return 0;
} }
int tdbCommit(TENV *pEnv) { int tdbCommit(TENV *pEnv, TXN *pTxn) {
SPager *pPager; SPager *pPager;
int ret; int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager); ret = tdbPagerCommit(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -103,7 +103,7 @@ int tdbCommit(TENV *pEnv) {
return 0; return 0;
} }
int tdbRollback(TENV *pEnv) { int tdbRollback(TENV *pEnv, TXN *pTxn) {
ASSERT(0); ASSERT(0);
return 0; return 0;
} }

View File

@ -18,6 +18,7 @@ struct SPCache {
int pageSize; int pageSize;
int cacheSize; int cacheSize;
tdb_mutex_t mutex; tdb_mutex_t mutex;
SPage *pList;
int nFree; int nFree;
SPage *pFree; SPage *pFree;
int nPage; int nPage;
@ -35,16 +36,17 @@ struct SPCache {
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL) #define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
static int tdbPCacheOpenImpl(SPCache *pCache); static int tdbPCacheOpenImpl(SPCache *pCache);
static void tdbPCacheInitLock(SPCache *pCache); static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
static void tdbPCacheClearLock(SPCache *pCache);
static void tdbPCacheLock(SPCache *pCache);
static void tdbPCacheUnlock(SPCache *pCache);
static bool tdbPCacheLocked(SPCache *pCache);
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage); static void tdbPCachePinPage(SPCache *pCache, SPage *pPage);
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage); static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage); static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage); static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage);
static int tdbPCacheCloseImpl(SPCache *pCache);
static void tdbPCacheInitLock(SPCache *pCache) { tdbMutexInit(&(pCache->mutex), NULL); }
static void tdbPCacheDestroyLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexUnlock(&(pCache->mutex)); }
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) { int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
SPCache *pCache; SPCache *pCache;
@ -69,16 +71,19 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
} }
int tdbPCacheClose(SPCache *pCache) { int tdbPCacheClose(SPCache *pCache) {
/* TODO */ if (pCache) {
tdbPCacheCloseImpl(pCache);
tdbOsFree(pCache);
}
return 0; return 0;
} }
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) { SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
SPage *pPage; SPage *pPage;
tdbPCacheLock(pCache); tdbPCacheLock(pCache);
pPage = tdbPCacheFetchImpl(pCache, pPgid, alcNewPage); pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn);
if (pPage) { if (pPage) {
TDB_REF_PAGE(pPage); TDB_REF_PAGE(pPage);
} }
@ -88,32 +93,40 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) {
return pPage; return pPage;
} }
void tdbPCacheRelease(SPCache *pCache, SPage *pPage) { void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
i32 nRef; i32 nRef;
nRef = TDB_UNREF_PAGE(pPage); nRef = TDB_UNREF_PAGE(pPage);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
tdbPCacheLock(pCache);
// test the nRef again to make sure
// it is safe th handle the page
nRef = TDB_GET_PAGE_REF(pPage);
if (nRef == 0) {
if (pPage->isLocal) {
tdbPCacheUnpinPage(pCache, pPage); tdbPCacheUnpinPage(pCache, pPage);
} else {
// remove from hash
tdbPCacheRemovePageFromHash(pCache, pPage);
// free the page
if (pTxn && pTxn->xFree) {
tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg);
}
} }
} }
static void tdbPCacheInitLock(SPCache *pCache) { tdbMutexInit(&(pCache->mutex), NULL); } tdbPCacheUnlock(pCache);
}
static void tdbPCacheClearLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexUnlock(&(pCache->mutex)); }
static bool tdbPCacheLocked(SPCache *pCache) {
assert(0);
// TODO
return true;
} }
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) { int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; }
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
int ret;
SPage *pPage; SPage *pPage;
// 1. Search the hash table // 1. Search the hash table
@ -123,10 +136,10 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
pPage = pPage->pHashNext; pPage = pPage->pHashNext;
} }
if (pPage || !alcNewPage) {
if (pPage) { if (pPage) {
// TODO: the page need to be copied and
// replaced the page in hash table
tdbPCachePinPage(pCache, pPage); tdbPCachePinPage(pCache, pPage);
}
return pPage; return pPage;
} }
@ -145,7 +158,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
tdbPCachePinPage(pCache, pPage); tdbPCachePinPage(pCache, pPage);
} }
// 4. Try a stress allocation (TODO) // 4. Try a create new page
if (!pPage && pTxn && pTxn->xMalloc) {
ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0) {
// TODO
ASSERT(0);
return NULL;
}
// init the page fields
pPage->isAnchor = 0;
pPage->isLocal = 0;
TDB_INIT_PAGE_REF(pPage);
}
// 5. Page here are just created from a free list // 5. Page here are just created from a free list
// or by recycling or allocated streesly, // or by recycling or allocated streesly,
@ -154,6 +180,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid)); memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
pPage->pLruNext = NULL; pPage->pLruNext = NULL;
pPage->pPager = NULL; pPage->pPager = NULL;
// TODO: allocated page may not add to hash
tdbPCacheAddPageToHash(pCache, pPage); tdbPCacheAddPageToHash(pCache, pPage);
} }
@ -173,25 +201,17 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) { static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
i32 nRef; i32 nRef;
tdbPCacheLock(pCache);
ASSERT(!pPage->isDirty); ASSERT(!pPage->isDirty);
ASSERT(TDB_GET_PAGE_REF(pPage) == 0);
nRef = TDB_GET_PAGE_REF(pPage);
ASSERT(nRef >= 0);
if (nRef == 0) {
// Add the page to LRU list
ASSERT(pPage->pLruNext == NULL); ASSERT(pPage->pLruNext == NULL);
pPage->pLruPrev = &(pCache->lru); pPage->pLruPrev = &(pCache->lru);
pPage->pLruNext = pCache->lru.pLruNext; pPage->pLruNext = pCache->lru.pLruNext;
pCache->lru.pLruNext->pLruPrev = pPage; pCache->lru.pLruNext->pLruPrev = pPage;
pCache->lru.pLruNext = pPage; pCache->lru.pLruNext = pPage;
}
pCache->nRecyclable++; pCache->nRecyclable++;
tdbPCacheUnlock(pCache);
} }
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
@ -238,13 +258,14 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// pPage->pgid = 0; // pPage->pgid = 0;
pPage->isAnchor = 0; pPage->isAnchor = 0;
pPage->isLocalPage = 1; pPage->isLocal = 1;
TDB_INIT_PAGE_REF(pPage); TDB_INIT_PAGE_REF(pPage);
pPage->pHashNext = NULL; pPage->pHashNext = NULL;
pPage->pLruNext = NULL; pPage->pLruNext = NULL;
pPage->pLruPrev = NULL; pPage->pLruPrev = NULL;
pPage->pDirtyNext = NULL; pPage->pDirtyNext = NULL;
// add page to free list
pPage->pFreeNext = pCache->pFree; pPage->pFreeNext = pCache->pFree;
pCache->pFree = pPage; pCache->pFree = pPage;
pCache->nFree++; pCache->nFree++;
@ -268,4 +289,13 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
return 0; return 0;
} }
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; } static int tdbPCacheCloseImpl(SPCache *pCache) {
SPage *pPage;
for (pPage = pCache->pList; pPage; pPage = pCache->pList) {
pCache->pList = pPage->pCacheNext;
tdbPageDestroy(pPage, NULL, NULL);
}
tdbPCacheDestroyLock(pCache);
}

View File

@ -278,7 +278,7 @@ static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
// 2. Try to allocate from the page free list // 2. Try to allocate from the page free list
cellFree = TDB_PAGE_FCELL(pPage); cellFree = TDB_PAGE_FCELL(pPage);
ASSERT(cellFree == 0 || cellFree > pPage->pFreeEnd - pPage->pData); ASSERT(cellFree == 0 || cellFree >= pPage->pFreeEnd - pPage->pData);
if (cellFree && pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) { if (cellFree && pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) {
SCell *pPrevFreeCell = NULL; SCell *pPrevFreeCell = NULL;
int szPrevFreeCell; int szPrevFreeCell;

View File

@ -27,7 +27,6 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct")
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage); static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage);
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
@ -77,6 +76,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
// pPager->jfd = -1; // pPager->jfd = -1;
pPager->pageSize = tdbPCacheGetPageSize(pCache); pPager->pageSize = tdbPCacheGetPageSize(pCache);
// pPager->dbOrigSize
ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
*ppPager = pPager; *ppPager = pPager;
return 0; return 0;
@ -92,9 +93,15 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
SPage *pPage; SPage *pPage;
int ret; int ret;
if (pPager->dbOrigSize > 0) {
pgno = 1;
} else {
pgno = 0;
}
{ {
// TODO: try to search the main DB to get the page number // TODO: try to search the main DB to get the page number
pgno = 0; // pgno = 0;
} }
// if (pgno == 0 && toCreate) { // if (pgno == 0 && toCreate) {
@ -157,7 +164,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
return 0; return 0;
} }
int tdbPagerBegin(SPager *pPager) { int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
if (pPager->inTran) { if (pPager->inTran) {
return 0; return 0;
} }
@ -175,7 +182,7 @@ int tdbPagerBegin(SPager *pPager) {
return 0; return 0;
} }
int tdbPagerCommit(SPager *pPager) { int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
SPage *pPage; SPage *pPage;
int ret; int ret;
@ -204,7 +211,7 @@ int tdbPagerCommit(SPager *pPager) {
pPage->isDirty = 0; pPage->isDirty = 0;
tdbPCacheRelease(pPager->pCache, pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
// sync the db file // sync the db file
@ -219,7 +226,8 @@ int tdbPagerCommit(SPager *pPager) {
return 0; return 0;
} }
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) { int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn) {
SPage *pPage; SPage *pPage;
SPgid pgid; SPgid pgid;
int ret; int ret;
@ -227,7 +235,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache // Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno; pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1); pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
if (pPage == NULL) { if (pPage == NULL) {
return -1; return -1;
} }
@ -247,7 +255,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
return 0; return 0;
} }
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) { int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn) {
int ret; int ret;
SPage *pPage; SPage *pPage;
SPgid pgid; SPgid pgid;
@ -255,6 +264,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Allocate a page number // Allocate a page number
ret = tdbPagerAllocPage(pPager, ppgno); ret = tdbPagerAllocPage(pPager, ppgno);
if (ret < 0) { if (ret < 0) {
ASSERT(0);
return -1; return -1;
} }
@ -263,8 +273,9 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache // Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = *ppgno; pgid.pgno = *ppgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1); pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
if (pPage == NULL) { if (pPage == NULL) {
ASSERT(0);
return -1; return -1;
} }
@ -273,6 +284,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Initialize the page if need // Initialize the page if need
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0); ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0);
if (ret < 0) { if (ret < 0) {
ASSERT(0);
return -1; return -1;
} }
@ -283,7 +295,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
return 0; return 0;
} }
void tdbPagerReturnPage(SPager *pPager, SPage *pPage) { tdbPCacheRelease(pPager->pCache, pPage); } void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); }
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) { static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
// TODO: Allocate a page from the free list // TODO: Allocate a page from the free list
@ -295,7 +307,7 @@ static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
return 0; return 0;
} }
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) { int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
int ret; int ret;
*ppgno = 0; *ppgno = 0;

View File

@ -15,29 +15,17 @@
#include "tdbInt.h" #include "tdbInt.h"
// int tdbTxnBegin(TENV *pEnv) { int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
// // TODO int flags) {
// return 0; // not support read-committed version at the moment
// } ASSERT(flags == 0 || flags == TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// int tdbTxnCommit(TENV *pEnv) { pTxn->flags = flags;
// SPager *pPager = NULL; pTxn->txnId = txnid;
// int ret; pTxn->xMalloc = xMalloc;
pTxn->xFree = xFree;
pTxn->xArg = xArg;
return 0;
}
// for (;;) { int tdbTxnClose(TXN *pTxn) { return 0; }
// break;
// ret = tdbPagerCommit(pPager);
// if (ret < 0) {
// ASSERT(0);
// return -1;
// }
// }
// // TODO
// return 0;
// }
// int tdbTxnRollback(TENV *pEnv) {
// // TODO
// return 0;
// }

View File

@ -32,3 +32,16 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) {
return 0; return 0;
} }
int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size) {
int ret;
int64_t szBytes;
ret = tdbOsFileSize(fd, &szBytes);
if (ret < 0) {
return -1;
}
*size = szBytes / szPage;
return 0;
}

View File

@ -35,17 +35,18 @@ struct SBTC {
int idx; int idx;
int idxStack[BTREE_MAX_DEPTH + 1]; int idxStack[BTREE_MAX_DEPTH + 1];
SPage *pgStack[BTREE_MAX_DEPTH + 1]; SPage *pgStack[BTREE_MAX_DEPTH + 1];
TXN *pTxn;
}; };
// SBTree // SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt); int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt); int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// SBTC // SBTC
int tdbBtcOpen(SBTC *pCur, SBTree *pBt); int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn);
int tdbBtcMoveToFirst(SBTC *pBtc); int tdbBtcMoveToFirst(SBTC *pBtc);
int tdbBtcMoveToLast(SBTC *pBtc); int tdbBtcMoveToLast(SBTC *pBtc);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);

View File

@ -27,7 +27,7 @@ typedef struct STDBC TDBC;
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, TENV *pEnv, TDB **ppDb); int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbDbClose(TDB *pDb); int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb); int tdbDbDrop(TDB *pDb);
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen); int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);

View File

@ -33,9 +33,9 @@ typedef struct STEnv {
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv); int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv);
int tdbEnvClose(TENV *pEnv); int tdbEnvClose(TENV *pEnv);
int tdbBegin(TENV *pEnv); int tdbBegin(TENV *pEnv, TXN *pTxn);
int tdbCommit(TENV *pEnv); int tdbCommit(TENV *pEnv, TXN *pTxn);
int tdbRollback(TENV *pEnv); int tdbRollback(TENV *pEnv, TXN *pTxn);
void tdbEnvAddPager(TENV *pEnv, SPager *pPager); void tdbEnvAddPager(TENV *pEnv, SPager *pPager);
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager); void tdbEnvRemovePager(TENV *pEnv, SPager *pPager);

View File

@ -111,6 +111,21 @@ typedef struct SPager SPager;
typedef struct SPCache SPCache; typedef struct SPCache SPCache;
typedef struct SPage SPage; typedef struct SPage SPage;
// transaction
#define TDB_TXN_WRITE 0x1
#define TDB_TXN_READ_UNCOMMITTED 0x2
typedef struct STxn {
int flags;
i64 txnId;
void *(*xMalloc)(void *, size_t);
void (*xFree)(void *, void *);
void *xArg;
} TXN;
#define TDB_TXN_IS_WRITE(PTXN) ((PTXN)->flags & TDB_TXN_WRITE)
#define TDB_TXN_IS_READ(PTXN) (!TDB_TXN_IS_WRITE(PTXN))
#define TDB_TXN_IS_READ_UNCOMMITTED(PTXN) ((PTXN)->flags & TDB_TXN_READ_UNCOMMITTED)
#include "tdbOs.h" #include "tdbOs.h"
#include "tdbUtil.h" #include "tdbUtil.h"

View File

@ -53,6 +53,7 @@ typedef TdFilePtr tdb_fd_t;
#define tdbOsFSync taosFsyncFile #define tdbOsFSync taosFsyncFile
#define tdbOsLSeek taosLSeekFile #define tdbOsLSeek taosLSeekFile
#define tdbOsRemove remove #define tdbOsRemove remove
#define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL)
/* directory */ /* directory */
#define tdbOsMkdir taosMkDir #define tdbOsMkdir taosMkDir
@ -110,6 +111,7 @@ i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes);
#define tdbOsFSync fsync #define tdbOsFSync fsync
#define tdbOsLSeek lseek #define tdbOsLSeek lseek
#define tdbOsRemove remove #define tdbOsRemove remove
#define tdbOsFileSize(FD, PSIZE)
/* directory */ /* directory */
#define tdbOsMkdir mkdir #define tdbOsMkdir mkdir

View File

@ -22,9 +22,10 @@ extern "C" {
#define TDB_PCACHE_PAGE \ #define TDB_PCACHE_PAGE \
u8 isAnchor; \ u8 isAnchor; \
u8 isLocalPage; \ u8 isLocal; \
u8 isDirty; \ u8 isDirty; \
i32 nRef; \ i32 nRef; \
SPage *pCacheNext; \
SPage *pFreeNext; \ SPage *pFreeNext; \
SPage *pHashNext; \ SPage *pHashNext; \
SPage *pLruNext; \ SPage *pLruNext; \
@ -47,8 +48,8 @@ extern "C" {
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache); int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache); int tdbPCacheClose(SPCache *pCache);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage); SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage); void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
int tdbPCacheGetPageSize(SPCache *pCache); int tdbPCacheGetPageSize(SPCache *pCache);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -40,11 +40,14 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager); int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate); int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager); int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager); int tdbPagerCommit(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg); int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg); TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage); int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -20,13 +20,9 @@
extern "C" { extern "C" {
#endif #endif
typedef struct STxn TXN; int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags);
struct STxn { int tdbTxnClose(TXN *pTxn);
u64 txnId;
void *(*xMalloc)(void *, int);
void *xArg;
};
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -29,6 +29,7 @@ extern "C" {
#define TDB_ROUND8(x) (((x) + 7) & ~7) #define TDB_ROUND8(x) (((x) + 7) & ~7)
int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique); int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique);
int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size);
#define TDB_REALLOC(PTR, SIZE) \ #define TDB_REALLOC(PTR, SIZE) \
({ \ ({ \
@ -83,15 +84,18 @@ static inline int tdbPutVarInt(u8 *p, int v) {
static inline int tdbGetVarInt(const u8 *p, int *v) { static inline int tdbGetVarInt(const u8 *p, int *v) {
int n = 0; int n = 0;
int tv = 0; int tv = 0;
int t;
for (;;) { for (;;) {
if (p[n] <= 0x7f) { if (p[n] <= 0x7f) {
tv = (tv << 7) | p[n]; t = p[n];
tv |= (t << (7 * n));
n++; n++;
break; break;
} }
tv = (tv << 7) | (p[n] & 0x7f); t = p[n] & 0x7f;
tv |= (t << (7 * n));
n++; n++;
} }

View File

@ -1,3 +1,7 @@
# tdbTest # tdbTest
add_executable(tdbTest "tdbTest.cpp") add_executable(tdbTest "tdbTest.cpp")
target_link_libraries(tdbTest tdb gtest gtest_main) target_link_libraries(tdbTest tdb gtest gtest_main)
# tdbUtilTest
add_executable(tdbUtilTest "tdbUtilTest.cpp")
target_link_libraries(tdbUtilTest tdb gtest gtest_main)

View File

@ -19,7 +19,7 @@ static SPoolMem *openPool() {
return pPool; return pPool;
} }
static void closePool(SPoolMem *pPool) { static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem; SPoolMem *pMem;
do { do {
@ -35,13 +35,14 @@ static void closePool(SPoolMem *pPool) {
} while (1); } while (1);
assert(pPool->size == 0); assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
tdbOsFree(pPool); tdbOsFree(pPool);
} }
#define clearPool closePool static void *poolMalloc(void *arg, size_t size) {
static void *poolMalloc(void *arg, int size) {
void *ptr = NULL; void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg; SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem; SPoolMem *pMem;
@ -118,7 +119,8 @@ TEST(tdb_test, simple_test) {
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
FKeyComparator compFunc; FKeyComparator compFunc;
int nData = 50000000; int nData = 10000000;
TXN txn;
// Open Env // Open Env
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv); ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
@ -132,24 +134,41 @@ TEST(tdb_test, simple_test) {
{ {
char key[64]; char key[64];
char val[64]; char val[64];
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
{ // Insert some data // open the pool
for (int i = 1; i <= nData;) { pPool = openPool();
tdbBegin(pEnv);
for (int k = 0; k < 2000; k++) { // start a transaction
sprintf(key, "key%d", i); txnid++;
sprintf(val, "value%d", i); tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val)); tdbBegin(pEnv, &txn);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
i++;
}
tdbCommit(pEnv); // if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
// start a new transaction
clearPool(pPool);
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
} }
} }
tdbCommit(pEnv); // commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
{ // Query the data { // Query the data
void *pVal = NULL; void *pVal = NULL;

View File

@ -0,0 +1,20 @@
#include <gtest/gtest.h>
#include "tdbInt.h"
#include <string>
TEST(tdb_util_test, simple_test) {
int vEncode = 5000;
int vDecode;
int nEncode;
int nDecode;
u8 buffer[128];
nEncode = tdbPutVarInt(buffer, vEncode);
nDecode = tdbGetVarInt(buffer, &vDecode);
GTEST_ASSERT_EQ(nEncode, nDecode);
GTEST_ASSERT_EQ(vEncode, vDecode);
}

View File

@ -206,7 +206,7 @@ if $data02 != 2678400000 then
endi endi
sql select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w) sql select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w) print ===> select _wstartts, count(tbcol), _wduration, _wstartts, count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> rows: $rows print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows1: $data10 $data11 $data12 $data13 $data14
@ -219,6 +219,7 @@ if $data00 != @21-11-30 08:00:00.000@ then
return -1 return -1
endi endi
if $data01 != NULL then if $data01 != NULL then
print expect null, actual: $data01
return -1 return -1
endi endi
if $data31 != $data34 then if $data31 != $data34 then