diff --git a/include/util/tencode.h b/include/util/tencode.h index cdde378b69..7c877ae428 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -18,7 +18,6 @@ #include "tcoding.h" #include "tfreelist.h" -#include "tmacro.h" #ifdef __cplusplus extern "C" { diff --git a/include/util/tmacro.h b/include/util/tmacro.h deleted file mode 100644 index 07c6e6509e..0000000000 --- a/include/util/tmacro.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_UTIL_MACRO_H_ -#define _TD_UTIL_MACRO_H_ - -#include "os.h" - -#ifdef __cplusplus -extern "C" { -#endif - -// Module init/clear MACRO definitions -#define TD_MOD_UNINITIALIZED 0 -#define TD_MOD_INITIALIZED 1 - -typedef int8_t td_mode_flag_t; - -#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED) -#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_INITIALIZED, TD_MOD_UNINITIALIZED) - -#define TD_IS_NULL(PTR) ((PTR) == NULL) - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_UTIL_MACRO_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index ff92cf880b..2fb29ce944 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -115,7 +115,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } } - vnodeProcessWMsgs(pVnode->pImpl, pArray); + vnodePreprocessWriteReqs(pVnode->pImpl, pArray); numOfMsgs = taosArrayGetSize(pArray); for (int32_t i = 0; i < numOfMsgs; i++) { @@ -123,7 +123,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRsp = NULL; - int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); + int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; tmsgSendRsp(pRsp); @@ -153,7 +153,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // todo SRpcMsg *pRsp = NULL; - (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); } } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index f43f0b427e..3ab007e3a2 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -6,6 +6,7 @@ target_sources( # vnode "src/vnd/vnodeArenaMAImpl.c" "src/vnd/vnodeBufferPool.c" + # "src/vnd/vnodeBufferPool2.c" "src/vnd/vnodeCfg.c" "src/vnd/vnodeCommit.c" "src/vnd/vnodeInt.c" @@ -14,7 +15,7 @@ target_sources( "src/vnd/vnodeStateMgr.c" "src/vnd/vnodeWrite.c" "src/vnd/vnodeModule.c" - # "src/vnd/vnodeMgr.c" + "src/vnd/vnodeSvr.c" # meta # "src/meta/metaBDBImpl.c" @@ -25,7 +26,6 @@ target_sources( "src/meta/metaTDBImpl.c" # tsdb - # "src/tsdb/tsdbBDBImpl.c" "src/tsdb/tsdbTDBImpl.c" "src/tsdb/tsdbCommit.c" "src/tsdb/tsdbCompact.c" diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7ed0092d36..76fc09ca1d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -47,8 +47,8 @@ void vnodeCleanup(); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); void vnodeClose(SVnode *pVnode); void vnodeDestroy(const char *path); -void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index d43503c1ca..94a1266f46 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -96,6 +96,7 @@ tb_uid_t metaGenerateUid(SMeta* pMeta); struct SMeta { char* path; + SVnode* pVnode; SMetaCfg options; SMetaDB* pDB; SMetaIdx* pIdx; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6fbf265d2c..d0e005f990 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -157,6 +157,7 @@ typedef struct { struct STsdb { int32_t vgId; + SVnode *pVnode; bool repoLocked; TdThreadMutex mutex; char *path; @@ -954,6 +955,43 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { // return 0; // } +typedef struct SSmaKey SSmaKey; + +struct SSmaKey { + TSKEY skey; + int64_t groupId; +}; + +typedef struct SDBFile SDBFile; + +struct SDBFile { + int32_t fid; + TDB *pDB; + char *path; +}; + +int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path); +int32_t tsdbCloseDBEnv(TENV *pEnv); +int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF); +int32_t tsdbCloseDBF(SDBFile *pDBF); +int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn); +void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen); + +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); +void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); +#if 0 +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); +#endif + +// internal func +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) { + int32_t len = 0; + len += taosEncodeFixedI64(pData, tsKey); + len += taosEncodeFixedI64(pData, groupId); + return len; +} + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h deleted file mode 100644 index ce03fa7c67..0000000000 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_VNODE_TSDB_SMA_H_ -#define _TD_VNODE_TSDB_SMA_H_ - -#include "tdbInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct SSmaKey SSmaKey; - -struct SSmaKey { - TSKEY skey; - int64_t groupId; -}; - - -typedef struct SDBFile SDBFile; - -struct SDBFile { - int32_t fid; - TDB *pDB; - char *path; -}; - -int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path); -int32_t tsdbCloseDBEnv(TENV *pEnv); -int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF); -int32_t tsdbCloseDBF(SDBFile *pDBF); -int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn); -void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen); - -void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); -void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); -#if 0 -int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); -int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); -#endif - -// internal func -static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) { - int32_t len = 0; - len += taosEncodeFixedI64(pData, tsKey); - len += taosEncodeFixedI64(pData, groupId); - return len; -} - - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_VNODE_TSDB_SMA_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 6957cfdb73..1cdb38b650 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -36,6 +36,7 @@ int vnodeScheduleTask(int (*execute)(void*), void* arg); // vnodeQuery ==================== int vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); +int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); #if 1 // SVBufPool diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c956184fc9..f988df01cb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -24,12 +24,12 @@ #include "tcoding.h" #include "tcompression.h" #include "tdatablock.h" +#include "tdbInt.h" #include "tfs.h" #include "tglobal.h" #include "tlist.h" #include "tlockfree.h" #include "tlosertree.h" -#include "tmacro.h" #include "tmallocator.h" #include "tskiplist.h" #include "tstream.h" @@ -100,8 +100,6 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); #include "tq.h" -#include "tsdbSma.h" - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c deleted file mode 100644 index acd9c2dcaa..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define ALLOW_FORBID_FUNC -#include "db.h" - -#include "vnodeInt.h" - -#define IMPL_WITH_LOCK 1 - -static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); -static void tsdbCloseBDBDb(DB *pDB); - -#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code)) - -int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { - // TDBEnv is shared by a group of SDBFile - if (!pEnv) { - terrno = TSDB_CODE_INVALID_PTR; - return -1; - } - - // Open DBF - if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) { - terrno = TSDB_CODE_TDB_INIT_FAILED; - tsdbCloseBDBDb(pDBF->pDB); - return -1; - } - - return 0; -} - -void tsdbCloseDBF(SDBFile *pDBF) { - if (pDBF->pDB) { - tsdbCloseBDBDb(pDBF->pDB); - pDBF->pDB = NULL; - } - taosMemoryFreeClear(pDBF->path); -} - -int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { - int ret = 0; - DB_ENV *pEnv = NULL; - - if (path == NULL) return 0; - - ret = db_env_create(&pEnv, 0); - if (ret != 0) { - BDB_PERR("Failed to create tsdb env", ret); - return -1; - } - - ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0); - if (ret != 0) { - terrno = TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR; - tsdbWarn("Failed to open tsdb env for path %s since ret %d != 0", path ? path : "NULL", ret); - return -1; - } - - *ppEnv = pEnv; - - return 0; -} - -void tsdbCloseBDBEnv(DB_ENV *pEnv) { - if (pEnv) { - pEnv->close(pEnv, 0); - } -} - -static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) { - int ret; - DB *pDB; - - ret = db_create(&(pDB), pEnv, 0); - if (ret != 0) { - BDB_PERR("Failed to create DBP", ret); - return -1; - } - - if (isDup) { - ret = pDB->set_flags(pDB, DB_DUPSORT); - if (ret != 0) { - BDB_PERR("Failed to set DB flags", ret); - return -1; - } - } - - ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0); - if (ret) { - BDB_PERR("Failed to open DBF", ret); - return -1; - } - - *ppDB = pDB; - - return 0; -} - -static void tsdbCloseBDBDb(DB *pDB) { - if (pDB) { - pDB->close(pDB, 0); - } -} - -int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, uint32_t dataSize) { - int ret; - DBT key1 = {0}, value1 = {0}; - - key1.data = key; - key1.size = keySize; - - value1.data = data; - value1.size = dataSize; - - // TODO: lock - ret = pDBF->pDB->put(pDBF->pDB, NULL, &key1, &value1, 0); - if (ret) { - BDB_PERR("Failed to put data to DBF", ret); - // TODO: unlock - return -1; - } - // TODO: unlock - - return 0; -} - -void *tsdbGetSmaDataByKey(SDBFile *pDBF, void *key, uint32_t keySize, uint32_t *valueSize) { - void *result = NULL; - DBT key1 = {0}; - DBT value1 = {0}; - int ret; - - // Set key/value - key1.data = key; - key1.size = keySize; - - // Query - // TODO: lock - ret = pDBF->pDB->get(pDBF->pDB, NULL, &key1, &value1, 0); - // TODO: unlock - if (ret != 0) { - return NULL; - } - - result = taosMemoryCalloc(1, value1.size); - - if (result == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - *valueSize = value1.size; - memcpy(result, value1.data, value1.size); - - return result; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeBufferPool2.c b/source/dnode/vnode/src/vnd/vnodeBufferPool2.c new file mode 100644 index 0000000000..d63c86734a --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeBufferPool2.c @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeInt.h" + +/* ------------------------ STRUCTURES ------------------------ */ + +static int vnodeBufPoolCreate(int size, SVBufPool **ppPool); +static int vnodeBufPoolDestroy(SVBufPool *pPool); + +int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { + SVBufPool *pPool = NULL; + int ret; + + ASSERT(pVnode->pPool == NULL); + + for (int i = 0; i < 3; i++) { + // create pool + ret = vnodeBufPoolCreate(size, &pPool); + if (ret < 0) { + vError("vgId:%d failed to open vnode buffer pool since %s", TD_VNODE_ID(pVnode), tstrerror(terrno)); + vnodeCloseBufPool(pVnode); + return -1; + } + + // add pool to queue + pPool->next = pVnode->pPool; + pVnode->pPool = pPool; + } + + vDebug("vgId:%d vnode buffer pool is opened, pool size: %" PRId64, TD_VNODE_ID(pVnode), size); + + return 0; +} + +int vnodeCloseBufPool(SVnode *pVnode) { + SVBufPool *pPool; + + for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) { + pVnode->pPool = pPool->next; + vnodeBufPoolDestroy(pPool); + } + + vDebug("vgId:%d vnode buffer pool is closed", TD_VNODE_ID(pVnode)); + + return 0; +} + +void vnodeBufPoolReset(SVBufPool *pPool) { + SVBufPoolNode *pNode; + + for (pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { + ASSERT(pNode->pnext == &pPool->pTail); + pNode->prev->pnext = &pPool->pTail; + pPool->pTail = pNode->prev; + pPool->size = pPool->size - sizeof(*pNode) - pNode->size; + taosMemoryFree(pNode); + } + + ASSERT(pPool->size == pPool->ptr - pPool->node.data); + + pPool->size = 0; + pPool->ptr = pPool->node.data; +} + +void *vnodeBufPoolMalloc(SVBufPool *pPool, size_t size) { + SVBufPoolNode *pNode; + void *p; + + if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { + // allocate from the anchor node + p = pPool->ptr; + pPool->ptr = pPool->ptr + size; + pPool->size += size; + } else { + // allocate a new node + pNode = taosMemoryMalloc(sizeof(*pNode) + size); + if (pNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + p = pNode->data; + pNode->size = size; + pNode->prev = pPool->pTail; + pNode->pnext = &pPool->pTail; + pPool->pTail->pnext = &pNode->prev; + pPool->pTail = pNode; + + pPool->size = pPool->size + sizeof(*pNode) + size; + } + + return p; +} + +void vnodeBufPoolFree(SVBufPool *pPool, void *p) { + uint8_t *ptr = (uint8_t *)p; + SVBufPoolNode *pNode; + + if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) { + pNode = &((SVBufPoolNode *)p)[-1]; + *pNode->pnext = pNode->prev; + pNode->prev->pnext = pNode->pnext; + + pPool->size = pPool->size - sizeof(*pNode) - pNode->size; + taosMemoryFree(pNode); + } +} + +// STATIC METHODS ------------------- +static int vnodeBufPoolCreate(int size, SVBufPool **ppPool) { + SVBufPool *pPool; + + pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); + if (pPool == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pPool->next = NULL; + pPool->nRef = 0; + pPool->size = 0; + pPool->ptr = pPool->node.data; + pPool->pTail = &pPool->node; + pPool->node.prev = NULL; + pPool->node.pnext = &pPool->pTail; + pPool->node.size = size; + + *ppPool = pPool; + return 0; +} + +static int vnodeBufPoolDestroy(SVBufPool *pPool) { + vnodeBufPoolReset(pPool); + taosMemoryFree(pPool); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 270dc377b9..10d8154a15 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -21,25 +21,4 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } int32_t vnodeCompact(SVnode *pVnode) { return 0; } -int32_t vnodeSync(SVnode *pVnode) { return 0; } - -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { - pLoad->vgId = pVnode->vgId; - pLoad->role = TAOS_SYNC_STATE_LEADER; - pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); - pLoad->numOfTimeSeries = 400; - pLoad->totalStorage = 300; - pLoad->compStorage = 200; - pLoad->pointsWritten = 100; - pLoad->numOfSelectReqs = 1; - pLoad->numOfInsertReqs = 3; - pLoad->numOfInsertSuccessReqs = 2; - pLoad->numOfBatchInsertReqs = 5; - pLoad->numOfBatchInsertSuccessReqs = 4; - return 0; -} - -int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - /*vInfo("sync message is processed");*/ - return 0; -} +int32_t vnodeSync(SVnode *pVnode) { return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 87ce471de9..75079d50b2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -15,64 +15,13 @@ #include "vnodeInt.h" -static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); - int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { - vTrace("message in query queue is processing"); - SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; - - switch (pMsg->msgType) { - case TDMT_VND_QUERY: - return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); - case TDMT_VND_QUERY_CONTINUE: - return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg); - default: - vError("unknown msg type:%d in query queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; - } -} - -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { - vTrace("message in fetch queue is processing"); - char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - switch (pMsg->msgType) { - case TDMT_VND_FETCH: - return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); - case TDMT_VND_FETCH_RSP: - return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg); - case TDMT_VND_RES_READY: - return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); - case TDMT_VND_TASKS_STATUS: - return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg); - case TDMT_VND_CANCEL_TASK: - return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); - case TDMT_VND_DROP_TASK: - return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); - case TDMT_VND_TABLE_META: - return vnodeGetTableMeta(pVnode, pMsg); - case TDMT_VND_CONSUME: - return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); - case TDMT_VND_TASK_PIPE_EXEC: - case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); - case TDMT_VND_STREAM_TRIGGER: - return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); - case TDMT_VND_QUERY_HEARTBEAT: - return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); - default: - vError("unknown msg type:%d in fetch queue", pMsg->msgType); - return TSDB_CODE_VND_APP_ERROR; - } -} - -static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { +int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { STbCfg *pTbCfg = NULL; STbCfg *pStbCfg = NULL; tb_uid_t uid; @@ -200,3 +149,19 @@ _exit: tmsgSendRsp(&rpcMsg); return TSDB_CODE_SUCCESS; } + +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { + pLoad->vgId = pVnode->vgId; + pLoad->role = TAOS_SYNC_STATE_LEADER; + pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); + pLoad->numOfTimeSeries = 400; + pLoad->totalStorage = 300; + pLoad->compStorage = 200; + pLoad->pointsWritten = 100; + pLoad->numOfSelectReqs = 1; + pLoad->numOfInsertReqs = 3; + pLoad->numOfInsertSuccessReqs = 2; + pLoad->numOfBatchInsertReqs = 5; + pLoad->numOfBatchInsertSuccessReqs = 4; + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c new file mode 100644 index 0000000000..395f715b8f --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -0,0 +1,302 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeInt.h" + +static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); +static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp); +static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); + +void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { + SNodeMsg *pMsg; + SRpcMsg *pRpc; + + for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { + pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); + pRpc = &pMsg->rpcMsg; + + // set request version + void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); + int64_t ver = pVnode->state.processed++; + taosEncodeFixedI64(&pBuf, ver); + + if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { + // TODO: handle error + /*ASSERT(false);*/ + vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); + } + } + + walFsync(pVnode->pWal, false); +} + +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + void *ptr = NULL; + int ret; + + if (pVnode->config.streamMode == 0) { + ptr = vnodeMalloc(pVnode, pMsg->contLen); + if (ptr == NULL) { + // TODO: handle error + } + + // TODO: copy here need to be extended + memcpy(ptr, pMsg->pCont, pMsg->contLen); + } + + // todo: change the interface here + int64_t ver; + taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { + // TODO: handle error + } + + switch (pMsg->msgType) { + case TDMT_VND_CREATE_STB: + ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); + return 0; + case TDMT_VND_CREATE_TABLE: + return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp); + case TDMT_VND_ALTER_STB: + return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); + case TDMT_VND_DROP_STB: + vTrace("vgId:%d, process drop stb req", pVnode->vgId); + break; + case TDMT_VND_DROP_TABLE: + break; + case TDMT_VND_SUBMIT: + /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ + if (pVnode->config.streamMode == 0) { + if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { + // TODO: handle error + } + } + break; + case TDMT_VND_MQ_SET_CONN: { + if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // TODO: handle error + } + } break; + case TDMT_VND_MQ_REB: { + if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + } + } break; + case TDMT_VND_MQ_CANCEL_CONN: { + if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + } + } break; + case TDMT_VND_TASK_DEPLOY: { + if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + } + } break; + case TDMT_VND_TASK_WRITE_EXEC: { + if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), + 0) < 0) { + } + } break; + case TDMT_VND_CREATE_SMA: { // timeRangeSMA + + if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // TODO + } + // } break; + // case TDMT_VND_CANCEL_SMA: { // timeRangeSMA + // } break; + // case TDMT_VND_DROP_SMA: { // timeRangeSMA + // if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // // TODO + // } + + } break; + default: + ASSERT(0); + break; + } + + pVnode->state.applied = ver; + + // Check if it needs to commit + if (vnodeShouldCommit(pVnode)) { + // tsem_wait(&(pVnode->canCommit)); + if (vnodeAsyncCommit(pVnode) < 0) { + // TODO: handle error + } + } + + return 0; +} + +int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { + vTrace("message in query queue is processing"); + SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; + + switch (pMsg->msgType) { + case TDMT_VND_QUERY: + return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); + case TDMT_VND_QUERY_CONTINUE: + return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg); + default: + vError("unknown msg type:%d in query queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("message in fetch queue is processing"); + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + switch (pMsg->msgType) { + case TDMT_VND_FETCH: + return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_FETCH_RSP: + return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_RES_READY: + return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_TASKS_STATUS: + return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_CANCEL_TASK: + return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_DROP_TASK: + return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_TABLE_META: + return vnodeGetTableMeta(pVnode, pMsg); + case TDMT_VND_CONSUME: + return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); + case TDMT_VND_TASK_PIPE_EXEC: + case TDMT_VND_TASK_MERGE_EXEC: + return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); + case TDMT_VND_STREAM_TRIGGER: + return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); + case TDMT_VND_QUERY_HEARTBEAT: + return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); + default: + vError("unknown msg type:%d in fetch queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + +// TODO: remove the function +void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { + // TODO + + // blockDebugShowData(data); + tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); +} + +int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + /*vInfo("sync message is processed");*/ + return 0; +} + +static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { + SVCreateTbReq vCreateTbReq = {0}; + tDeserializeSVCreateTbReq(pReq, &vCreateTbReq); + if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { + // TODO + return -1; + } + + taosMemoryFree(vCreateTbReq.stbCfg.pSchema); + taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); + if (vCreateTbReq.stbCfg.pRSmaParam) { + taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); + } + taosMemoryFree(vCreateTbReq.dbFName); + taosMemoryFree(vCreateTbReq.name); + + return 0; +} + +static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) { + SVCreateTbBatchReq vCreateTbBatchReq = {0}; + SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; + tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); + int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray); + for (int i = 0; i < reqNum; i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); + + char tableFName[TSDB_TABLE_FNAME_LEN]; + SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name); + + int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); + if (code) { + SVCreateTbRsp rsp; + rsp.code = code; + + taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); + } + + if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { + // TODO: handle error + vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); + } + // TODO: to encapsule a free API + taosMemoryFree(pCreateTbReq->name); + taosMemoryFree(pCreateTbReq->dbFName); + if (pCreateTbReq->type == TD_SUPER_TABLE) { + taosMemoryFree(pCreateTbReq->stbCfg.pSchema); + taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); + if (pCreateTbReq->stbCfg.pRSmaParam) { + taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); + } + } else if (pCreateTbReq->type == TD_CHILD_TABLE) { + taosMemoryFree(pCreateTbReq->ctbCfg.pTag); + } else { + taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); + if (pCreateTbReq->ntbCfg.pRSmaParam) { + taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam); + } + } + } + + vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); + taosArrayDestroy(vCreateTbBatchReq.pArray); + if (vCreateTbBatchRsp.rspList) { + int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); + void *msg = rpcMallocCont(contLen); + tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); + taosArrayDestroy(vCreateTbBatchRsp.rspList); + + *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); + (*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; + (*pRsp)->pCont = msg; + (*pRsp)->contLen = contLen; + (*pRsp)->handle = pMsg->handle; + (*pRsp)->ahandle = pMsg->ahandle; + } + + return 0; +} + +static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { + SVCreateTbReq vAlterTbReq = {0}; + vTrace("vgId:%d, process alter stb req", pVnode->vgId); + tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); + // TODO: to encapsule a free API + taosMemoryFree(vAlterTbReq.stbCfg.pSchema); + taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); + if (vAlterTbReq.stbCfg.pRSmaParam) { + taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds); + taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); + } + taosMemoryFree(vAlterTbReq.dbFName); + taosMemoryFree(vAlterTbReq.name); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 24b5d4bae5..16b881f00d 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -15,277 +15,4 @@ #include "vnodeInt.h" -void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { - // TODO - - // blockDebugShowData(data); - tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); -} - -void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SNodeMsg *pMsg; - SRpcMsg *pRpc; - - for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { - pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); - pRpc = &pMsg->rpcMsg; - - // set request version - void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); - int64_t ver = pVnode->state.processed++; - taosEncodeFixedI64(&pBuf, ver); - - if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { - // TODO: handle error - /*ASSERT(false);*/ - vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); - } - } - - walFsync(pVnode->pWal, false); - - // TODO: Integrate RAFT module here - - // No results are returned because error handling is difficult - // return 0; -} - -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - void *ptr = NULL; - - if (pVnode->config.streamMode == 0) { - ptr = vnodeMalloc(pVnode, pMsg->contLen); - if (ptr == NULL) { - // TODO: handle error - } - - // TODO: copy here need to be extended - memcpy(ptr, pMsg->pCont, pMsg->contLen); - } - - // todo: change the interface here - int64_t ver; - taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { - // TODO: handle error - } - - switch (pMsg->msgType) { - case TDMT_VND_CREATE_STB: { - SVCreateTbReq vCreateTbReq = {0}; - tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); - if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { - // TODO: handle error - } - - // TODO: to encapsule a free API - taosMemoryFree(vCreateTbReq.stbCfg.pSchema); - taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); - if (vCreateTbReq.stbCfg.pRSmaParam) { - taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); - } - taosMemoryFree(vCreateTbReq.dbFName); - taosMemoryFree(vCreateTbReq.name); - break; - } - case TDMT_VND_CREATE_TABLE: { - SVCreateTbBatchReq vCreateTbBatchReq = {0}; - SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; - tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); - int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray); - for (int i = 0; i < reqNum; i++) { - SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); - - char tableFName[TSDB_TABLE_FNAME_LEN]; - SMsgHead *pHead = (SMsgHead *)pMsg->pCont; - sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name); - - int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); - if (code) { - SVCreateTbRsp rsp; - rsp.code = code; - - taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); - } - - if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { - // TODO: handle error - vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); - } - // TODO: to encapsule a free API - taosMemoryFree(pCreateTbReq->name); - taosMemoryFree(pCreateTbReq->dbFName); - if (pCreateTbReq->type == TD_SUPER_TABLE) { - taosMemoryFree(pCreateTbReq->stbCfg.pSchema); - taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); - if (pCreateTbReq->stbCfg.pRSmaParam) { - taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); - } - } else if (pCreateTbReq->type == TD_CHILD_TABLE) { - taosMemoryFree(pCreateTbReq->ctbCfg.pTag); - } else { - taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); - if (pCreateTbReq->ntbCfg.pRSmaParam) { - taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam); - } - } - } - - vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); - taosArrayDestroy(vCreateTbBatchReq.pArray); - if (vCreateTbBatchRsp.rspList) { - int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); - void *msg = rpcMallocCont(contLen); - tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); - taosArrayDestroy(vCreateTbBatchRsp.rspList); - - *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); - (*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; - (*pRsp)->pCont = msg; - (*pRsp)->contLen = contLen; - (*pRsp)->handle = pMsg->handle; - (*pRsp)->ahandle = pMsg->ahandle; - } - break; - } - case TDMT_VND_ALTER_STB: { - SVCreateTbReq vAlterTbReq = {0}; - vTrace("vgId:%d, process alter stb req", pVnode->vgId); - tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); - // TODO: to encapsule a free API - taosMemoryFree(vAlterTbReq.stbCfg.pSchema); - taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); - if (vAlterTbReq.stbCfg.pRSmaParam) { - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds); - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); - } - taosMemoryFree(vAlterTbReq.dbFName); - taosMemoryFree(vAlterTbReq.name); - break; - } - case TDMT_VND_DROP_STB: - vTrace("vgId:%d, process drop stb req", pVnode->vgId); - break; - case TDMT_VND_DROP_TABLE: - // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { - // // TODO: handle error - // } - break; - case TDMT_VND_SUBMIT: - /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ - if (pVnode->config.streamMode == 0) { - if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { - // TODO: handle error - } - } - break; - case TDMT_VND_MQ_SET_CONN: { - if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // TODO: handle error - } - } break; - case TDMT_VND_MQ_REB: { - if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - } - } break; - case TDMT_VND_MQ_CANCEL_CONN: { - if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - } - } break; - case TDMT_VND_TASK_DEPLOY: { - if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { - } - } break; - case TDMT_VND_TASK_WRITE_EXEC: { - if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), - 0) < 0) { - } - } break; - case TDMT_VND_CREATE_SMA: { // timeRangeSMA -#if 0 - - SSmaCfg vCreateSmaReq = {0}; - if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - vWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, - terrstr(terrno)); - return -1; - } - vDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, pVnode->config.vgId, - vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid); - - // record current timezone of server side - vCreateSmaReq.tSma.timezoneInt = tsTimezone; - - if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) { - // TODO: handle error - tdDestroyTSma(&vCreateSmaReq.tSma); - return -1; - } - - tsdbTSmaAdd(pVnode->pTsdb, 1); - - tdDestroyTSma(&vCreateSmaReq.tSma); - // TODO: return directly or go on follow steps? -#endif - if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // TODO - } - // } break; - // case TDMT_VND_CANCEL_SMA: { // timeRangeSMA - // } break; - // case TDMT_VND_DROP_SMA: { // timeRangeSMA - // if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // // TODO - // } -#if 0 - tsdbTSmaSub(pVnode->pTsdb, 1); - SVDropTSmaReq vDropSmaReq = {0}; - if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - // TODO: send msg to stream computing to drop tSma - // if ((send msg to stream computing) < 0) { - // tdDestroyTSma(&vCreateSmaReq); - // return -1; - // } - // - - if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) { - // TODO: handle error - return -1; - } - - if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) { - // TODO: handle error - return -1; - } - - // TODO: return directly or go on follow steps? -#endif - } break; - default: - ASSERT(0); - break; - } - - pVnode->state.applied = ver; - - // Check if it needs to commit - if (vnodeShouldCommit(pVnode)) { - // tsem_wait(&(pVnode->canCommit)); - if (vnodeAsyncCommit(pVnode) < 0) { - // TODO: handle error - } - } - - return 0; -} - /* ------------------------ STATIC METHODS ------------------------ */