Merge pull request #12484 from taosdata/feat/alter_table
feat: alter table
This commit is contained in:
commit
f4ecfcad7d
|
@ -78,6 +78,12 @@ option(
|
||||||
OFF
|
OFF
|
||||||
)
|
)
|
||||||
|
|
||||||
|
option(
|
||||||
|
BUILD_WITH_BDB
|
||||||
|
"If build with BDB"
|
||||||
|
OFF
|
||||||
|
)
|
||||||
|
|
||||||
option(
|
option(
|
||||||
BUILD_WITH_LUCENE
|
BUILD_WITH_LUCENE
|
||||||
"If build with lucene"
|
"If build with lucene"
|
||||||
|
|
|
@ -78,9 +78,9 @@ if(${BUILD_WITH_UV})
|
||||||
endif(${BUILD_WITH_UV})
|
endif(${BUILD_WITH_UV})
|
||||||
|
|
||||||
# bdb
|
# bdb
|
||||||
#if(${BUILD_WITH_BDB})
|
if(${BUILD_WITH_BDB})
|
||||||
#cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
#endif(${BUILD_WITH_BDB})
|
endif(${BUILD_WITH_BDB})
|
||||||
|
|
||||||
# sqlite
|
# sqlite
|
||||||
if(${BUILD_WITH_SQLITE})
|
if(${BUILD_WITH_SQLITE})
|
||||||
|
|
|
@ -7,9 +7,9 @@ if(${BUILD_WITH_LUCENE})
|
||||||
add_subdirectory(lucene)
|
add_subdirectory(lucene)
|
||||||
endif(${BUILD_WITH_LUCENE})
|
endif(${BUILD_WITH_LUCENE})
|
||||||
|
|
||||||
#if(${BUILD_WITH_BDB})
|
if(${BUILD_WITH_BDB})
|
||||||
#add_subdirectory(bdb)
|
add_subdirectory(bdb)
|
||||||
#endif(${BUILD_WITH_BDB})
|
endif(${BUILD_WITH_BDB})
|
||||||
|
|
||||||
if(${BUILD_WITH_SQLITE})
|
if(${BUILD_WITH_SQLITE})
|
||||||
add_subdirectory(sqlite)
|
add_subdirectory(sqlite)
|
||||||
|
|
|
@ -1670,6 +1670,7 @@ typedef struct SVDropStbReq {
|
||||||
int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
|
int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
|
||||||
int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
|
int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
|
||||||
|
|
||||||
|
// TDMT_VND_CREATE_TABLE ==============
|
||||||
#define TD_CREATE_IF_NOT_EXISTS 0x1
|
#define TD_CREATE_IF_NOT_EXISTS 0x1
|
||||||
typedef struct SVCreateTbReq {
|
typedef struct SVCreateTbReq {
|
||||||
int32_t flags;
|
int32_t flags;
|
||||||
|
@ -1759,6 +1760,45 @@ typedef struct {
|
||||||
int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
|
int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
|
||||||
int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
|
int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
|
||||||
|
|
||||||
|
// TDMT_VND_ALTER_TABLE =====================
|
||||||
|
typedef struct {
|
||||||
|
const char* tbName;
|
||||||
|
int8_t action;
|
||||||
|
// TSDB_ALTER_TABLE_ADD_COLUMN
|
||||||
|
int8_t type;
|
||||||
|
int32_t bytes;
|
||||||
|
const char* colAddName;
|
||||||
|
// TSDB_ALTER_TABLE_DROP_COLUMN
|
||||||
|
const char* colDropName;
|
||||||
|
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
|
||||||
|
const char* colModName;
|
||||||
|
int32_t colModBytes;
|
||||||
|
// TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
|
||||||
|
const char* colOldName;
|
||||||
|
const char* colNewName;
|
||||||
|
// TSDB_ALTER_TABLE_UPDATE_TAG_VAL
|
||||||
|
const char* tagName;
|
||||||
|
int8_t isNull;
|
||||||
|
uint32_t nTagVal;
|
||||||
|
const uint8_t* pTagVal;
|
||||||
|
// TSDB_ALTER_TABLE_UPDATE_OPTIONS
|
||||||
|
int8_t updateTTL;
|
||||||
|
int32_t newTTL;
|
||||||
|
int8_t updateComment;
|
||||||
|
const char* newComment;
|
||||||
|
} SVAlterTbReq;
|
||||||
|
|
||||||
|
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
|
||||||
|
int32_t tDecodeSVAlterTbReq(SDecoder* pDecoder, SVAlterTbReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t code;
|
||||||
|
} SVAlterTbRsp;
|
||||||
|
|
||||||
|
int32_t tEncodeSVAlterTbRsp(SEncoder* pEncoder, const SVAlterTbRsp* pRsp);
|
||||||
|
int32_t tDecodeSVAlterTbRsp(SDecoder* pDecoder, SVAlterTbRsp* pRsp);
|
||||||
|
// ======================
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
// #include "tfreelist.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|
|
@ -1,60 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_UTIL_FREELIST_H_
|
|
||||||
#define _TD_UTIL_FREELIST_H_
|
|
||||||
|
|
||||||
#include "tlist.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
struct SFreeListNode {
|
|
||||||
TD_SLIST_NODE(SFreeListNode);
|
|
||||||
char payload[];
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef TD_SLIST(SFreeListNode) SFreeList;
|
|
||||||
|
|
||||||
#define TFL_MALLOC(PTR, TYPE, SIZE, LIST) \
|
|
||||||
do { \
|
|
||||||
void *ptr = taosMemoryMalloc((SIZE) + sizeof(struct SFreeListNode)); \
|
|
||||||
if (ptr) { \
|
|
||||||
TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \
|
|
||||||
ptr = ((struct SFreeListNode *)ptr)->payload; \
|
|
||||||
(PTR) = (TYPE)(ptr); \
|
|
||||||
}else{ \
|
|
||||||
(PTR) = NULL; \
|
|
||||||
} \
|
|
||||||
}while(0);
|
|
||||||
|
|
||||||
#define tFreeListInit(pFL) TD_SLIST_INIT(pFL)
|
|
||||||
|
|
||||||
static FORCE_INLINE void tFreeListClear(SFreeList *pFL) {
|
|
||||||
struct SFreeListNode *pNode;
|
|
||||||
for (;;) {
|
|
||||||
pNode = TD_SLIST_HEAD(pFL);
|
|
||||||
if (pNode == NULL) break;
|
|
||||||
TD_SLIST_POP(pFL);
|
|
||||||
taosMemoryFree(pNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_UTIL_FREELIST_H_*/
|
|
|
@ -4163,3 +4163,111 @@ void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
|
||||||
|
|
||||||
taosMemoryFree(pRsp);
|
taosMemoryFree(pRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pReq->action) < 0) return -1;
|
||||||
|
switch (pReq->action) {
|
||||||
|
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||||
|
if (tEncodeI8(pEncoder, pReq->type) < 0) return -1;
|
||||||
|
if (tEncodeI32v(pEncoder, pReq->bytes) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->colAddName) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->colDropName) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->colModName) < 0) return -1;
|
||||||
|
if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->colOldName) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->colNewName) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->tagName) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pReq->isNull) < 0) return -1;
|
||||||
|
if (!pReq->isNull) {
|
||||||
|
if (tEncodeBinary(pEncoder, pReq->pTagVal, pReq->nTagVal) < 0) return -1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
|
||||||
|
if (tEncodeI8(pEncoder, pReq->updateTTL) < 0) return -1;
|
||||||
|
if (pReq->updateTTL) {
|
||||||
|
if (tEncodeI32v(pEncoder, pReq->newTTL) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tEncodeI8(pEncoder, pReq->updateComment) < 0) return -1;
|
||||||
|
if (pReq->updateComment) {
|
||||||
|
if (tEncodeCStr(pEncoder, pReq->newComment) < 0) return -1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
|
||||||
|
switch (pReq->action) {
|
||||||
|
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->type) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pReq->bytes) < 0) return -1;
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->colAddName) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->colDropName) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->colModName) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->colOldName) < 0) return -1;
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->colNewName) < 0) return -1;
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->tagName) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->isNull) < 0) return -1;
|
||||||
|
if (!pReq->isNull) {
|
||||||
|
if (tDecodeBinary(pDecoder, &pReq->pTagVal, &pReq->nTagVal) < 0) return -1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->updateTTL) < 0) return -1;
|
||||||
|
if (pReq->updateTTL) {
|
||||||
|
if (tDecodeI32v(pDecoder, &pReq->newTTL) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->updateComment) < 0) return -1;
|
||||||
|
if (pReq->updateComment) {
|
||||||
|
if (tDecodeCStr(pDecoder, &pReq->newComment) < 0) return -1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSVAlterTbRsp(SEncoder *pEncoder, const SVAlterTbRsp *pRsp) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->code) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSVAlterTbRsp(SDecoder *pDecoder, SVAlterTbRsp *pRsp) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->code) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -383,9 +383,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.suid = pStb->uid;
|
req.suid = pStb->uid;
|
||||||
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
||||||
req.schema.nCols = pStb->numOfColumns;
|
req.schema.nCols = pStb->numOfColumns;
|
||||||
req.schema.sver = 0;
|
req.schema.sver = pStb->version;
|
||||||
req.schema.pSchema = pStb->pColumns;
|
req.schema.pSchema = pStb->pColumns;
|
||||||
req.schemaTag.nCols = pStb->numOfTags;
|
req.schemaTag.nCols = pStb->numOfTags;
|
||||||
|
req.schemaTag.nCols = 0;
|
||||||
req.schemaTag.pSchema = pStb->pTags;
|
req.schemaTag.pSchema = pStb->pTags;
|
||||||
|
|
||||||
if (req.rollup) {
|
if (req.rollup) {
|
||||||
|
|
|
@ -79,6 +79,7 @@ int metaClose(SMeta* pMeta);
|
||||||
int metaBegin(SMeta* pMeta);
|
int metaBegin(SMeta* pMeta);
|
||||||
int metaCommit(SMeta* pMeta);
|
int metaCommit(SMeta* pMeta);
|
||||||
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
|
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
||||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
||||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
|
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
|
||||||
|
|
|
@ -131,6 +131,75 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
|
SMetaEntry oStbEntry = {0};
|
||||||
|
SMetaEntry nStbEntry = {0};
|
||||||
|
TDBC *pUidIdxc = NULL;
|
||||||
|
TDBC *pTbDbc = NULL;
|
||||||
|
const void *pData;
|
||||||
|
int nData;
|
||||||
|
int64_t oversion;
|
||||||
|
SDecoder dc = {0};
|
||||||
|
int32_t ret;
|
||||||
|
int32_t c;
|
||||||
|
|
||||||
|
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
|
||||||
|
ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
|
||||||
|
if (ret < 0 || c) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||||
|
if (ret < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
oversion = *(int64_t *)pData;
|
||||||
|
|
||||||
|
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
|
||||||
|
ret = tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
|
||||||
|
ASSERT(ret == 0 && c == 0);
|
||||||
|
|
||||||
|
ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
tDecoderInit(&dc, pData, nData);
|
||||||
|
metaDecodeEntry(&dc, &oStbEntry);
|
||||||
|
|
||||||
|
nStbEntry.version = version;
|
||||||
|
nStbEntry.type = TSDB_SUPER_TABLE;
|
||||||
|
nStbEntry.uid = pReq->suid;
|
||||||
|
nStbEntry.name = pReq->name;
|
||||||
|
nStbEntry.stbEntry.schema = pReq->schema;
|
||||||
|
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
|
|
||||||
|
metaWLock(pMeta);
|
||||||
|
// compare two entry
|
||||||
|
if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) {
|
||||||
|
if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) {
|
||||||
|
metaSaveToSkmDb(pMeta, &nStbEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
|
||||||
|
// // change tag schema
|
||||||
|
// }
|
||||||
|
|
||||||
|
// update table.db
|
||||||
|
metaSaveToTbDb(pMeta, &nStbEntry);
|
||||||
|
|
||||||
|
// update uid index
|
||||||
|
tdbDbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
||||||
|
|
||||||
|
metaULock(pMeta);
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
tdbDbcClose(pTbDbc);
|
||||||
|
tdbDbcClose(pUidIdxc);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
|
@ -91,7 +91,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
// TODO set to real sversion
|
// TODO set to real sversion
|
||||||
*pUid = 0;
|
*pUid = 0;
|
||||||
|
|
||||||
int32_t sversion = 0;
|
int32_t sversion = 1;
|
||||||
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
||||||
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
||||||
|
|
||||||
|
|
|
@ -465,7 +465,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
||||||
pTbData = (STbData *)pNode->pData;
|
pTbData = (STbData *)pNode->pData;
|
||||||
|
|
||||||
pCommitIter = pCommith->iters + i;
|
pCommitIter = pCommith->iters + i;
|
||||||
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); // TODO: schema version
|
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 1); // TODO: schema version
|
||||||
|
|
||||||
if (pTSchema) {
|
if (pTSchema) {
|
||||||
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
|
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
|
||||||
|
@ -912,7 +912,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
|
||||||
while (bidx < nBlocks) {
|
while (bidx < nBlocks) {
|
||||||
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
|
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
|
||||||
// Set commit table
|
// Set commit table
|
||||||
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version
|
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version
|
||||||
if (!pTSchema) {
|
if (!pTSchema) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -490,7 +490,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
|
||||||
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
|
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
|
||||||
|
|
||||||
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
|
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 1);
|
||||||
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
|
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
|
||||||
int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
|
int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
|
||||||
|
|
||||||
|
@ -1618,7 +1618,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
|
||||||
if (pSchema1 == NULL) {
|
if (pSchema1 == NULL) {
|
||||||
// pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
// pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
||||||
// TODO: use the real schemaVersion
|
// TODO: use the real schemaVersion
|
||||||
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0);
|
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef TD_DEBUG_PRINT_ROW
|
#ifdef TD_DEBUG_PRINT_ROW
|
||||||
|
@ -1637,7 +1637,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
|
||||||
if (pSchema2 == NULL) {
|
if (pSchema2 == NULL) {
|
||||||
// pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
|
// pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
|
||||||
// TODO: use the real schemaVersion
|
// TODO: use the real schemaVersion
|
||||||
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0);
|
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1);
|
||||||
}
|
}
|
||||||
if (isRow2DataRow) {
|
if (isRow2DataRow) {
|
||||||
numOfColsOfRow2 = schemaNCols(pSchema2);
|
numOfColsOfRow2 = schemaNCols(pSchema2);
|
||||||
|
@ -2755,7 +2755,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
||||||
|
|
||||||
win->ekey = key;
|
win->ekey = key;
|
||||||
if (rv != TD_ROW_SVER(row)) {
|
if (rv != TD_ROW_SVER(row)) {
|
||||||
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 1);
|
||||||
rv = TD_ROW_SVER(row);
|
rv = TD_ROW_SVER(row);
|
||||||
}
|
}
|
||||||
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
|
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
|
||||||
|
@ -3877,7 +3877,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
||||||
|
|
||||||
// NOTE: not add ref count for super table
|
// NOTE: not add ref count for super table
|
||||||
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
|
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 1, true);
|
||||||
|
|
||||||
// no tags and tbname condition, all child tables of this stable are involved
|
// no tags and tbname condition, all child tables of this stable are involved
|
||||||
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
||||||
|
|
|
@ -2084,7 +2084,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
|
||||||
|
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
||||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
|
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1);
|
||||||
if (!pTSchema) {
|
if (!pTSchema) {
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -73,7 +73,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_ALTER_STB:
|
case TDMT_VND_ALTER_STB:
|
||||||
if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
|
if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_DROP_STB:
|
case TDMT_VND_DROP_STB:
|
||||||
if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||||
|
@ -397,20 +397,32 @@ _exit:
|
||||||
return rcode;
|
return rcode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
// ASSERT(0);
|
SVCreateStbReq req = {0};
|
||||||
#if 0
|
SDecoder dc = {0};
|
||||||
SVCreateTbReq vAlterTbReq = {0};
|
|
||||||
vTrace("vgId:%d, process alter stb req", TD_VID(pVnode));
|
pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
|
||||||
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
|
pRsp->code = TSDB_CODE_SUCCESS;
|
||||||
// TODO: to encapsule a free API
|
pRsp->pCont = NULL;
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
|
pRsp->contLen = 0;
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
|
|
||||||
if (vAlterTbReq.stbCfg.pRSmaParam) {
|
tDecoderInit(&dc, pReq, len);
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
|
||||||
|
// decode req
|
||||||
|
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
taosMemoryFree(vAlterTbReq.name);
|
|
||||||
#endif
|
if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
|
||||||
|
pRsp->code = terrno;
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -513,7 +525,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
|
||||||
if (pSchema) {
|
if (pSchema) {
|
||||||
taosMemoryFreeClear(pSchema);
|
taosMemoryFreeClear(pSchema);
|
||||||
}
|
}
|
||||||
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema
|
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1); // TODO: use the real schema
|
||||||
if (pSchema) {
|
if (pSchema) {
|
||||||
suid = msgIter->suid;
|
suid = msgIter->suid;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3725,7 +3725,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
||||||
req.type = TD_NORMAL_TABLE;
|
req.type = TD_NORMAL_TABLE;
|
||||||
req.name = strdup(pStmt->tableName);
|
req.name = strdup(pStmt->tableName);
|
||||||
req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols);
|
req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols);
|
||||||
req.ntb.schema.sver = 0;
|
req.ntb.schema.sver = 1;
|
||||||
req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema));
|
req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema));
|
||||||
if (NULL == req.name || NULL == req.ntb.schema.pSchema) {
|
if (NULL == req.name || NULL == req.ntb.schema.pSchema) {
|
||||||
destroyCreateTbReq(&req);
|
destroyCreateTbReq(&req);
|
||||||
|
|
|
@ -33,13 +33,13 @@ ENDIF()
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/src/util/inc)
|
INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/src/util/inc)
|
||||||
|
|
||||||
# freelistTest
|
# # freelistTest
|
||||||
add_executable(freelistTest "")
|
# add_executable(freelistTest "")
|
||||||
target_sources(freelistTest
|
# target_sources(freelistTest
|
||||||
PRIVATE
|
# PRIVATE
|
||||||
"freelistTest.cpp"
|
# "freelistTest.cpp"
|
||||||
)
|
# )
|
||||||
target_link_libraries(freelistTest os util gtest gtest_main)
|
# target_link_libraries(freelistTest os util gtest gtest_main)
|
||||||
|
|
||||||
# # encodeTest
|
# # encodeTest
|
||||||
# add_executable(encodeTest "encodeTest.cpp")
|
# add_executable(encodeTest "encodeTest.cpp")
|
||||||
|
|
|
@ -1,17 +0,0 @@
|
||||||
#include <gtest/gtest.h>
|
|
||||||
|
|
||||||
#include "tfreelist.h"
|
|
||||||
|
|
||||||
TEST(TD_UTIL_FREELIST_TEST, simple_test) {
|
|
||||||
SFreeList fl;
|
|
||||||
|
|
||||||
tFreeListInit(&fl);
|
|
||||||
|
|
||||||
for (size_t i = 0; i < 1000; i++) {
|
|
||||||
void *ptr = NULL;
|
|
||||||
TFL_MALLOC(ptr, void*, 1024, &fl);
|
|
||||||
GTEST_ASSERT_NE(ptr, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
tFreeListClear(&fl);
|
|
||||||
}
|
|
Loading…
Reference in New Issue