refact vnode
This commit is contained in:
parent
9221e6f403
commit
c032e89af6
|
@ -115,7 +115,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeProcessWMsgs(pVnode->pImpl, pArray);
|
vnodePreprocessWriteReqs(pVnode->pImpl, pArray);
|
||||||
|
|
||||||
numOfMsgs = taosArrayGetSize(pArray);
|
numOfMsgs = taosArrayGetSize(pArray);
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
@ -123,7 +123,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
|
|
||||||
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
|
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp);
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
|
@ -153,7 +153,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
// todo
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
(void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,6 @@ target_sources(
|
||||||
"src/meta/metaTDBImpl.c"
|
"src/meta/metaTDBImpl.c"
|
||||||
|
|
||||||
# tsdb
|
# tsdb
|
||||||
# "src/tsdb/tsdbBDBImpl.c"
|
|
||||||
"src/tsdb/tsdbTDBImpl.c"
|
"src/tsdb/tsdbTDBImpl.c"
|
||||||
"src/tsdb/tsdbCommit.c"
|
"src/tsdb/tsdbCommit.c"
|
||||||
"src/tsdb/tsdbCompact.c"
|
"src/tsdb/tsdbCompact.c"
|
||||||
|
|
|
@ -47,8 +47,8 @@ void vnodeCleanup();
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
void vnodeDestroy(const char *path);
|
void vnodeDestroy(const char *path);
|
||||||
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
|
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -1,169 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define ALLOW_FORBID_FUNC
|
|
||||||
#include "db.h"
|
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
|
||||||
|
|
||||||
#define IMPL_WITH_LOCK 1
|
|
||||||
|
|
||||||
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
|
|
||||||
static void tsdbCloseBDBDb(DB *pDB);
|
|
||||||
|
|
||||||
#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code))
|
|
||||||
|
|
||||||
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) {
|
|
||||||
// TDBEnv is shared by a group of SDBFile
|
|
||||||
if (!pEnv) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open DBF
|
|
||||||
if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) {
|
|
||||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
|
||||||
tsdbCloseBDBDb(pDBF->pDB);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbCloseDBF(SDBFile *pDBF) {
|
|
||||||
if (pDBF->pDB) {
|
|
||||||
tsdbCloseBDBDb(pDBF->pDB);
|
|
||||||
pDBF->pDB = NULL;
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(pDBF->path);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
|
|
||||||
int ret = 0;
|
|
||||||
DB_ENV *pEnv = NULL;
|
|
||||||
|
|
||||||
if (path == NULL) return 0;
|
|
||||||
|
|
||||||
ret = db_env_create(&pEnv, 0);
|
|
||||||
if (ret != 0) {
|
|
||||||
BDB_PERR("Failed to create tsdb env", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
|
|
||||||
if (ret != 0) {
|
|
||||||
terrno = TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR;
|
|
||||||
tsdbWarn("Failed to open tsdb env for path %s since ret %d != 0", path ? path : "NULL", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppEnv = pEnv;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbCloseBDBEnv(DB_ENV *pEnv) {
|
|
||||||
if (pEnv) {
|
|
||||||
pEnv->close(pEnv, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
|
|
||||||
int ret;
|
|
||||||
DB *pDB;
|
|
||||||
|
|
||||||
ret = db_create(&(pDB), pEnv, 0);
|
|
||||||
if (ret != 0) {
|
|
||||||
BDB_PERR("Failed to create DBP", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isDup) {
|
|
||||||
ret = pDB->set_flags(pDB, DB_DUPSORT);
|
|
||||||
if (ret != 0) {
|
|
||||||
BDB_PERR("Failed to set DB flags", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
|
|
||||||
if (ret) {
|
|
||||||
BDB_PERR("Failed to open DBF", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppDB = pDB;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbCloseBDBDb(DB *pDB) {
|
|
||||||
if (pDB) {
|
|
||||||
pDB->close(pDB, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, uint32_t dataSize) {
|
|
||||||
int ret;
|
|
||||||
DBT key1 = {0}, value1 = {0};
|
|
||||||
|
|
||||||
key1.data = key;
|
|
||||||
key1.size = keySize;
|
|
||||||
|
|
||||||
value1.data = data;
|
|
||||||
value1.size = dataSize;
|
|
||||||
|
|
||||||
// TODO: lock
|
|
||||||
ret = pDBF->pDB->put(pDBF->pDB, NULL, &key1, &value1, 0);
|
|
||||||
if (ret) {
|
|
||||||
BDB_PERR("Failed to put data to DBF", ret);
|
|
||||||
// TODO: unlock
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// TODO: unlock
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *tsdbGetSmaDataByKey(SDBFile *pDBF, void *key, uint32_t keySize, uint32_t *valueSize) {
|
|
||||||
void *result = NULL;
|
|
||||||
DBT key1 = {0};
|
|
||||||
DBT value1 = {0};
|
|
||||||
int ret;
|
|
||||||
|
|
||||||
// Set key/value
|
|
||||||
key1.data = key;
|
|
||||||
key1.size = keySize;
|
|
||||||
|
|
||||||
// Query
|
|
||||||
// TODO: lock
|
|
||||||
ret = pDBF->pDB->get(pDBF->pDB, NULL, &key1, &value1, 0);
|
|
||||||
// TODO: unlock
|
|
||||||
if (ret != 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
result = taosMemoryCalloc(1, value1.size);
|
|
||||||
|
|
||||||
if (result == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
*valueSize = value1.size;
|
|
||||||
memcpy(result, value1.data, value1.size);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
|
@ -15,7 +15,11 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
||||||
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp);
|
||||||
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
||||||
|
|
||||||
|
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
SNodeMsg *pMsg;
|
SNodeMsg *pMsg;
|
||||||
SRpcMsg *pRpc;
|
SRpcMsg *pRpc;
|
||||||
|
|
||||||
|
@ -36,14 +40,9 @@ void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
walFsync(pVnode->pWal, false);
|
walFsync(pVnode->pWal, false);
|
||||||
|
|
||||||
// TODO: Integrate RAFT module here
|
|
||||||
|
|
||||||
// No results are returned because error handling is difficult
|
|
||||||
// return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
|
|
||||||
if (pVnode->config.streamMode == 0) {
|
if (pVnode->config.streamMode == 0) {
|
||||||
|
@ -64,108 +63,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_CREATE_STB: {
|
case TDMT_VND_CREATE_STB:
|
||||||
SVCreateTbReq vCreateTbReq = {0};
|
return vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
case TDMT_VND_CREATE_TABLE:
|
||||||
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
|
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
||||||
// TODO: handle error
|
case TDMT_VND_ALTER_STB:
|
||||||
}
|
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
|
|
||||||
// TODO: to encapsule a free API
|
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
|
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
|
|
||||||
if (vCreateTbReq.stbCfg.pRSmaParam) {
|
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
|
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
|
||||||
}
|
|
||||||
taosMemoryFree(vCreateTbReq.dbFName);
|
|
||||||
taosMemoryFree(vCreateTbReq.name);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case TDMT_VND_CREATE_TABLE: {
|
|
||||||
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
|
||||||
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
|
||||||
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
|
||||||
int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray);
|
|
||||||
for (int i = 0; i < reqNum; i++) {
|
|
||||||
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
|
||||||
|
|
||||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
|
||||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
|
||||||
sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name);
|
|
||||||
|
|
||||||
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
|
|
||||||
if (code) {
|
|
||||||
SVCreateTbRsp rsp;
|
|
||||||
rsp.code = code;
|
|
||||||
|
|
||||||
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
|
||||||
}
|
|
||||||
// TODO: to encapsule a free API
|
|
||||||
taosMemoryFree(pCreateTbReq->name);
|
|
||||||
taosMemoryFree(pCreateTbReq->dbFName);
|
|
||||||
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
|
||||||
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
|
|
||||||
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
|
|
||||||
if (pCreateTbReq->stbCfg.pRSmaParam) {
|
|
||||||
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
|
|
||||||
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
|
|
||||||
}
|
|
||||||
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
|
|
||||||
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
|
|
||||||
} else {
|
|
||||||
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
|
|
||||||
if (pCreateTbReq->ntbCfg.pRSmaParam) {
|
|
||||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds);
|
|
||||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
|
||||||
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
|
||||||
if (vCreateTbBatchRsp.rspList) {
|
|
||||||
int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp);
|
|
||||||
void *msg = rpcMallocCont(contLen);
|
|
||||||
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
|
||||||
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
|
||||||
|
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
|
||||||
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
|
||||||
(*pRsp)->pCont = msg;
|
|
||||||
(*pRsp)->contLen = contLen;
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case TDMT_VND_ALTER_STB: {
|
|
||||||
SVCreateTbReq vAlterTbReq = {0};
|
|
||||||
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
|
||||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
|
|
||||||
// TODO: to encapsule a free API
|
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
|
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
|
|
||||||
if (vAlterTbReq.stbCfg.pRSmaParam) {
|
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
|
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
|
||||||
}
|
|
||||||
taosMemoryFree(vAlterTbReq.dbFName);
|
|
||||||
taosMemoryFree(vAlterTbReq.name);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case TDMT_VND_DROP_STB:
|
case TDMT_VND_DROP_STB:
|
||||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_DROP_TABLE:
|
case TDMT_VND_DROP_TABLE:
|
||||||
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
|
||||||
// // TODO: handle error
|
|
||||||
// }
|
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_SUBMIT:
|
case TDMT_VND_SUBMIT:
|
||||||
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
|
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
|
||||||
|
@ -199,32 +106,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
||||||
#if 0
|
|
||||||
|
|
||||||
SSmaCfg vCreateSmaReq = {0};
|
|
||||||
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
vWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId,
|
|
||||||
terrstr(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
vDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, pVnode->config.vgId,
|
|
||||||
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid);
|
|
||||||
|
|
||||||
// record current timezone of server side
|
|
||||||
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
|
|
||||||
|
|
||||||
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
tdDestroyTSma(&vCreateSmaReq.tSma);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbTSmaAdd(pVnode->pTsdb, 1);
|
|
||||||
|
|
||||||
tdDestroyTSma(&vCreateSmaReq.tSma);
|
|
||||||
// TODO: return directly or go on follow steps?
|
|
||||||
#endif
|
|
||||||
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
@ -235,33 +117,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
// // TODO
|
// // TODO
|
||||||
// }
|
// }
|
||||||
#if 0
|
|
||||||
tsdbTSmaSub(pVnode->pTsdb, 1);
|
|
||||||
SVDropTSmaReq vDropSmaReq = {0};
|
|
||||||
if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: send msg to stream computing to drop tSma
|
|
||||||
// if ((send msg to stream computing) < 0) {
|
|
||||||
// tdDestroyTSma(&vCreateSmaReq);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
|
|
||||||
if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: return directly or go on follow steps?
|
|
||||||
#endif
|
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -341,4 +197,104 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
/*vInfo("sync message is processed");*/
|
/*vInfo("sync message is processed");*/
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
||||||
|
SVCreateTbReq vCreateTbReq = {0};
|
||||||
|
tDeserializeSVCreateTbReq(pReq, &vCreateTbReq);
|
||||||
|
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
|
||||||
|
// TODO
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
|
||||||
|
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
|
||||||
|
if (vCreateTbReq.stbCfg.pRSmaParam) {
|
||||||
|
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||||
|
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
||||||
|
}
|
||||||
|
taosMemoryFree(vCreateTbReq.dbFName);
|
||||||
|
taosMemoryFree(vCreateTbReq.name);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) {
|
||||||
|
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
||||||
|
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
||||||
|
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
||||||
|
int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray);
|
||||||
|
for (int i = 0; i < reqNum; i++) {
|
||||||
|
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
||||||
|
|
||||||
|
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
||||||
|
sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name);
|
||||||
|
|
||||||
|
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
|
||||||
|
if (code) {
|
||||||
|
SVCreateTbRsp rsp;
|
||||||
|
rsp.code = code;
|
||||||
|
|
||||||
|
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
|
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
||||||
|
}
|
||||||
|
// TODO: to encapsule a free API
|
||||||
|
taosMemoryFree(pCreateTbReq->name);
|
||||||
|
taosMemoryFree(pCreateTbReq->dbFName);
|
||||||
|
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
||||||
|
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
|
||||||
|
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
|
||||||
|
if (pCreateTbReq->stbCfg.pRSmaParam) {
|
||||||
|
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
|
||||||
|
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
|
||||||
|
}
|
||||||
|
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
|
||||||
|
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
|
||||||
|
} else {
|
||||||
|
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
|
||||||
|
if (pCreateTbReq->ntbCfg.pRSmaParam) {
|
||||||
|
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds);
|
||||||
|
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
||||||
|
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
||||||
|
if (vCreateTbBatchRsp.rspList) {
|
||||||
|
int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp);
|
||||||
|
void *msg = rpcMallocCont(contLen);
|
||||||
|
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
||||||
|
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
||||||
|
|
||||||
|
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||||
|
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
||||||
|
(*pRsp)->pCont = msg;
|
||||||
|
(*pRsp)->contLen = contLen;
|
||||||
|
(*pRsp)->handle = pMsg->handle;
|
||||||
|
(*pRsp)->ahandle = pMsg->ahandle;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
|
||||||
|
SVCreateTbReq vAlterTbReq = {0};
|
||||||
|
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||||
|
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
|
||||||
|
// TODO: to encapsule a free API
|
||||||
|
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
|
||||||
|
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
|
||||||
|
if (vAlterTbReq.stbCfg.pRSmaParam) {
|
||||||
|
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||||
|
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
||||||
|
}
|
||||||
|
taosMemoryFree(vAlterTbReq.dbFName);
|
||||||
|
taosMemoryFree(vAlterTbReq.name);
|
||||||
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue