Merge pull request #11617 from taosdata/feature/vnode_refact1
refactor: vnode
This commit is contained in:
commit
25a160dde5
|
@ -1432,7 +1432,6 @@ typedef struct {
|
|||
|
||||
typedef struct SVCreateTbReq {
|
||||
int64_t ver; // use a general definition
|
||||
char* dbFName;
|
||||
char* name;
|
||||
uint32_t ttl;
|
||||
uint32_t keep;
|
||||
|
|
|
@ -398,7 +398,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
int32_t tlen = 0;
|
||||
|
||||
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||
tlen += taosEncodeString(buf, pReq->dbFName);
|
||||
tlen += taosEncodeString(buf, pReq->name);
|
||||
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
||||
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
||||
|
@ -467,7 +466,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
|||
|
||||
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
||||
buf = taosDecodeString(buf, &(pReq->dbFName));
|
||||
buf = taosDecodeString(buf, &(pReq->name));
|
||||
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
||||
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
||||
|
|
|
@ -82,13 +82,13 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
|
|||
}
|
||||
|
||||
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||
memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
|
||||
|
||||
pCfg->vgId = pCreate->vgId;
|
||||
strcpy(pCfg->dbname, pCreate->db);
|
||||
pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024;
|
||||
pCfg->ssize = 1024;
|
||||
pCfg->lsize = 1024 * 1024;
|
||||
pCfg->isHeapAllocator = true;
|
||||
pCfg->ttl = 4;
|
||||
pCfg->keep = pCreate->daysToKeep0;
|
||||
pCfg->streamMode = pCreate->streamMode;
|
||||
pCfg->isWeak = true;
|
||||
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
|
||||
|
@ -96,12 +96,6 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
|
||||
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
|
||||
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
|
||||
pCfg->walCfg.level = TAOS_WAL_WRITE;
|
||||
pCfg->walCfg.fsyncPeriod = 0;
|
||||
pCfg->walCfg.retentionPeriod = 0;
|
||||
pCfg->walCfg.retentionSize = 0;
|
||||
pCfg->walCfg.rollPeriod = 0;
|
||||
pCfg->walCfg.segSize = 0;
|
||||
pCfg->walCfg.vgId = pCreate->vgId;
|
||||
pCfg->hashBegin = pCreate->hashBegin;
|
||||
pCfg->hashEnd = pCreate->hashEnd;
|
||||
|
|
|
@ -349,7 +349,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
|
||||
SVCreateTbReq req = {0};
|
||||
req.ver = 0;
|
||||
req.dbFName = dbFName;
|
||||
req.name = (char *)tNameGetTableName(&name);
|
||||
req.ttl = 0;
|
||||
req.keep = 0;
|
||||
|
|
|
@ -40,6 +40,8 @@ typedef struct SVnode SVnode;
|
|||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||
typedef struct SVnodeCfg SVnodeCfg;
|
||||
|
||||
extern const SVnodeCfg vnodeCfgDefault;
|
||||
|
||||
int vnodeInit(int nthreads);
|
||||
void vnodeCleanup();
|
||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||
|
@ -133,6 +135,7 @@ struct STsdbCfg {
|
|||
|
||||
struct SVnodeCfg {
|
||||
int32_t vgId;
|
||||
char dbname[TSDB_DB_NAME_LEN];
|
||||
uint64_t dbId;
|
||||
uint64_t wsize;
|
||||
uint64_t ssize;
|
||||
|
|
|
@ -31,6 +31,8 @@ extern "C" {
|
|||
// clang-format on
|
||||
|
||||
// vnodeCfg ====================
|
||||
int vnodeEncodeConfig(const void* pObj, SJson* pJson);
|
||||
int vnodeDecodeConfig(const SJson* pJson, void* pObj);
|
||||
|
||||
// vnodeModule ====================
|
||||
int vnodeScheduleTask(int (*execute)(void*), void* arg);
|
||||
|
|
|
@ -16,13 +16,113 @@
|
|||
#include "vnodeInt.h"
|
||||
|
||||
const SVnodeCfg vnodeCfgDefault = {
|
||||
.wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}};
|
||||
.vgId = -1,
|
||||
.dbname = "",
|
||||
.dbId = 0,
|
||||
.wsize = 96 * 1024 * 1024,
|
||||
.ssize = 1 * 1024 * 1024,
|
||||
.lsize = 1024,
|
||||
.isHeapAllocator = false,
|
||||
.ttl = 0,
|
||||
.keep = 0,
|
||||
.streamMode = 0,
|
||||
.isWeak = 0,
|
||||
.tsdbCfg = {.precision = TWO_STAGE_COMP,
|
||||
.update = 0,
|
||||
.compression = 2,
|
||||
.days = 10,
|
||||
.minRows = 100,
|
||||
.maxRows = 4096,
|
||||
.keep2 = 3650,
|
||||
.keep0 = 3650,
|
||||
.keep1 = 3650},
|
||||
.walCfg =
|
||||
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
||||
.hashBegin = 0,
|
||||
.hashEnd = 0,
|
||||
.hashMethod = 0};
|
||||
|
||||
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||
const SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
||||
|
||||
if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
||||
if (tjsonAddStringToObject(pJson, "dbname", pCfg->dbname) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||
SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
||||
|
||||
if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
||||
if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
|
||||
uint32_t hashValue = 0;
|
||||
|
||||
|
|
|
@ -181,80 +181,6 @@ static int vnodeEndCommit(SVnode *pVnode) {
|
|||
|
||||
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
|
||||
|
||||
static int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||
const SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
||||
|
||||
if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||
SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
||||
|
||||
if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
|
||||
const SVState *pState = (SVState *)pObj;
|
||||
|
||||
|
|
|
@ -217,7 +217,6 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
|||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
||||
}
|
||||
taosMemoryFree(vCreateTbReq.dbFName);
|
||||
taosMemoryFree(vCreateTbReq.name);
|
||||
|
||||
return 0;
|
||||
|
@ -233,7 +232,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
|||
|
||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
||||
sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name);
|
||||
sprintf(tableFName, "%s.%s", pVnode->config.dbname, pCreateTbReq->name);
|
||||
|
||||
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
|
||||
if (code) {
|
||||
|
@ -249,7 +248,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
|||
}
|
||||
// TODO: to encapsule a free API
|
||||
taosMemoryFree(pCreateTbReq->name);
|
||||
taosMemoryFree(pCreateTbReq->dbFName);
|
||||
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
|
||||
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
|
||||
|
@ -298,7 +296,6 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
|
|||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
||||
}
|
||||
taosMemoryFree(vAlterTbReq.dbFName);
|
||||
taosMemoryFree(vAlterTbReq.name);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
#include "parInsertData.h"
|
||||
#include "parInt.h"
|
||||
#include "parUtil.h"
|
||||
#include "parToken.h"
|
||||
#include "parUtil.h"
|
||||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
#include "ttypes.h"
|
||||
|
@ -228,25 +228,23 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
|
|||
SName name = {0};
|
||||
createSName(&name, pTname, pBasicCtx, &pCxt->msg);
|
||||
if (isStb) {
|
||||
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
||||
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
|
||||
&pCxt->pTableMeta));
|
||||
} else {
|
||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
|
||||
&pCxt->pTableMeta));
|
||||
}
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||
CHECK_CODE(
|
||||
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||
return getTableMetaImpl(pCxt, pTname, false);
|
||||
}
|
||||
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||
return getTableMetaImpl(pCxt, pTname, true);
|
||||
}
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); }
|
||||
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); }
|
||||
|
||||
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
|
||||
while (start < end) {
|
||||
|
@ -391,8 +389,10 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
||||
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT &&
|
||||
pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) ||
|
||||
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
|
||||
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
|
||||
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
|
||||
pToken->type != TK_NK_BIN) ||
|
||||
(pToken->n == 0) || (pToken->type == TK_NK_RP)) {
|
||||
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
||||
}
|
||||
|
@ -448,7 +448,8 @@ static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPt
|
|||
return pToken->type;
|
||||
}
|
||||
|
||||
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
|
||||
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
|
||||
_row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
|
||||
int64_t iv;
|
||||
char* endptr = NULL;
|
||||
bool isSigned = false;
|
||||
|
@ -571,7 +572,8 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
|
|||
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
|
||||
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
||||
}
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
|
||||
isnan(dv)) {
|
||||
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
||||
}
|
||||
float tmpVal = (float)dv;
|
||||
|
@ -749,7 +751,8 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi
|
|||
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||
return buildSyntaxErrMsg(pMsgBuf, buf, value);;
|
||||
return buildSyntaxErrMsg(pMsgBuf, buf, value);
|
||||
;
|
||||
}
|
||||
|
||||
varDataSetLen(pa->buf, output);
|
||||
|
@ -765,7 +768,6 @@ static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, S
|
|||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pName, dbFName);
|
||||
pCxt->createTblReq.type = TD_CHILD_TABLE;
|
||||
pCxt->createTblReq.dbFName = strdup(dbFName);
|
||||
pCxt->createTblReq.name = strdup(pName->tname);
|
||||
pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid;
|
||||
pCxt->createTblReq.ctbCfg.pTag = row;
|
||||
|
@ -786,7 +788,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
|||
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
||||
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
|
||||
param.schema = pTagSchema;
|
||||
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
||||
CHECK_CODE(
|
||||
parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
||||
}
|
||||
|
||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||
|
@ -864,7 +867,8 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) {
|
||||
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len,
|
||||
char* tmpTokenBuf) {
|
||||
SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
|
||||
SRowBuilder* pBuilder = &pDataBlocks->rowBuilder;
|
||||
STSRow* row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size); // skip the SSubmitBlk header
|
||||
|
@ -968,7 +972,6 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
|
|||
}
|
||||
|
||||
static void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
|
||||
taosMemoryFreeClear(pReq->dbFName);
|
||||
taosMemoryFreeClear(pReq->name);
|
||||
taosMemoryFreeClear(pReq->ctbCfg.pTag);
|
||||
}
|
||||
|
@ -1023,7 +1026,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
// no data in the sql string anymore.
|
||||
if (sToken.n == 0) {
|
||||
if (0 == pCxt->totalNum) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");;
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
|
||||
;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1041,7 +1045,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq));
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
|
||||
&dataBuf, NULL, &pCxt->createTblReq));
|
||||
|
||||
if (TK_NK_LP == sToken.type) {
|
||||
// pSql -> field1_name, ...)
|
||||
|
@ -1071,7 +1076,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
|
||||
}
|
||||
// merge according to vgId
|
||||
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) &&
|
||||
taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||
}
|
||||
return buildOutput(pCxt);
|
||||
|
@ -1093,11 +1099,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
|||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
|
||||
.totalNum = 0,
|
||||
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)
|
||||
};
|
||||
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)};
|
||||
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
|
||||
NULL == context.pSubTableHashObj || NULL == context.pOutput) {
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
|
||||
NULL == context.pOutput) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
|
|
@ -2747,7 +2747,6 @@ static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSch
|
|||
}
|
||||
|
||||
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
||||
taosMemoryFreeClear(pReq->dbFName);
|
||||
taosMemoryFreeClear(pReq->name);
|
||||
taosMemoryFreeClear(pReq->ntbCfg.pSchema);
|
||||
}
|
||||
|
@ -2784,7 +2783,6 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
|||
|
||||
SVCreateTbReq req = {0};
|
||||
req.type = TD_NORMAL_TABLE;
|
||||
req.dbFName = strdup(dbFName);
|
||||
req.name = strdup(pStmt->tableName);
|
||||
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
|
||||
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
|
||||
|
@ -2843,7 +2841,6 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
|
|||
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
|
||||
taosMemoryFreeClear(pTableReq->dbFName);
|
||||
taosMemoryFreeClear(pTableReq->name);
|
||||
|
||||
if (pTableReq->type == TSDB_NORMAL_TABLE) {
|
||||
|
@ -2929,7 +2926,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
|
|||
|
||||
struct SVCreateTbReq req = {0};
|
||||
req.type = TD_CHILD_TABLE;
|
||||
req.dbFName = strdup(dbFName);
|
||||
req.name = strdup(pTableName);
|
||||
req.ctbCfg.suid = suid;
|
||||
req.ctbCfg.pTag = row;
|
||||
|
|
Loading…
Reference in New Issue