Merge branch 'develop' into xiaoping/add_test_case2
This commit is contained in:
commit
a387e6ff50
|
@ -221,7 +221,7 @@ TDengine采用时间驱动缓存管理策略(First-In-First-Out,FIFO),
|
|||
|
||||
TDengine通过查询函数向用户提供毫秒级的数据获取能力。直接将最近到达的数据保存在缓存中,可以更加快速地响应用户针对最近一条或一批数据的查询分析,整体上提供更快的数据库查询响应能力。从这个意义上来说,**可通过设置合适的配置参数将TDengine作为数据缓存来使用,而不需要再部署Redis或其他额外的缓存系统**,可有效地简化系统架构,降低运维的成本。需要注意的是,TDengine重启以后系统的缓存将被清空,之前缓存的数据均会被批量写入磁盘,缓存的数据将不会像专门的Key-value缓存系统再将之前缓存的数据重新加载到缓存中。
|
||||
|
||||
每个vnode有自己独立的内存,而且由多个固定大小的内存块组成,不同vnode之间完全隔离。数据写入时,类似于日志的写法,数据被顺序追加写入内存,但每个vnode维护有自己的skip list,便于迅速查找。当一半以上的内存块写满时,启动落盘操作,而且后续写的操作在新的内存块进行。这样,一个vnode里有一半内存块是保留有最近的数据的,以达到缓存、快速查找的目的。一个vnode的内存块的个数由配置参数blocks决定,内存块的大小由配置参数cache决定。
|
||||
每个vnode有自己独立的内存,而且由多个固定大小的内存块组成,不同vnode之间完全隔离。数据写入时,类似于日志的写法,数据被顺序追加写入内存,但每个vnode维护有自己的skip list,便于迅速查找。当三分之一以上的内存块写满时,启动落盘操作,而且后续写的操作在新的内存块进行。这样,一个vnode里有三分之一内存块是保留有最近的数据的,以达到缓存、快速查找的目的。一个vnode的内存块的个数由配置参数blocks决定,内存块的大小由配置参数cache决定。
|
||||
|
||||
### 持久化存储
|
||||
TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久化存储。当vnode中缓存的数据达到一定规模时,为了不阻塞后续数据的写入,TDengine也会拉起落盘线程将缓存的数据写入持久化存储。TDengine在数据落盘时会打开新的数据库日志文件,在落盘成功后则会删除老的数据库日志文件,避免日志文件无限制的增长。
|
||||
|
|
|
@ -571,8 +571,8 @@ static void balanceCheckDnodeAccess() {
|
|||
if (pDnode->status != TAOS_DN_STATUS_DROPPING && pDnode->status != TAOS_DN_STATUS_OFFLINE) {
|
||||
pDnode->status = TAOS_DN_STATUS_OFFLINE;
|
||||
pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT;
|
||||
mInfo("dnode:%d, set to offline state, access seq:%d, last seq:%d", pDnode->dnodeId, tsAccessSquence,
|
||||
pDnode->lastAccess);
|
||||
mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence,
|
||||
pDnode->lastAccess, pDnode->status);
|
||||
balanceSetVgroupOffline(pDnode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ extern int32_t tsMaxShellConns;
|
|||
extern int32_t tsShellActivityTimer;
|
||||
extern uint32_t tsMaxTmrCtrl;
|
||||
extern float tsNumOfThreadsPerCore;
|
||||
extern int32_t tsNumOfCommitThreads;
|
||||
extern float tsRatioOfQueryThreads; // todo remove it
|
||||
extern int8_t tsDaylight;
|
||||
extern char tsTimezone[];
|
||||
|
|
|
@ -51,6 +51,7 @@ int32_t tsMaxShellConns = 5000;
|
|||
int32_t tsMaxConnections = 5000;
|
||||
int32_t tsShellActivityTimer = 3; // second
|
||||
float tsNumOfThreadsPerCore = 1.0f;
|
||||
int32_t tsNumOfCommitThreads = 1;
|
||||
float tsRatioOfQueryThreads = 0.5f;
|
||||
int8_t tsDaylight = 0;
|
||||
char tsTimezone[TSDB_TIMEZONE_LEN] = {0};
|
||||
|
@ -426,6 +427,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "numOfCommitThreads";
|
||||
cfg.ptr = &tsNumOfCommitThreads;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
|
||||
cfg.minValue = 1;
|
||||
cfg.maxValue = 100;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "ratioOfQueryThreads";
|
||||
cfg.ptr = &tsRatioOfQueryThreads;
|
||||
cfg.valType = TAOS_CFG_VTYPE_FLOAT;
|
||||
|
|
|
@ -77,15 +77,15 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) {
|
|||
|
||||
void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) {
|
||||
if (ep->numOfEps <= 0) {
|
||||
dError("mnode EP list for peer is changed, but content is invalid, discard it");
|
||||
dError("minfos is changed, but content is invalid, discard it");
|
||||
return;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&tsMInfosMutex);
|
||||
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
|
||||
dInfo("minfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
|
||||
for (int i = 0; i < ep->numOfEps; ++i) {
|
||||
ep->port[i] -= TSDB_PORT_DNODEDNODE;
|
||||
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
|
||||
dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
|
||||
}
|
||||
tsMEpSet = *ep;
|
||||
pthread_mutex_unlock(&tsMInfosMutex);
|
||||
|
|
|
@ -431,6 +431,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
|
|||
#define TSDB_PORT_HTTP 11
|
||||
#define TSDB_PORT_ARBITRATOR 12
|
||||
|
||||
#define TSDB_MAX_WAL_SIZE (1024*1024)
|
||||
|
||||
typedef enum {
|
||||
TAOS_QTYPE_RPC = 0,
|
||||
TAOS_QTYPE_FWD = 1,
|
||||
|
|
|
@ -237,7 +237,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, 0, 0x070A, "Too many time window in query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached")
|
||||
|
||||
// grant
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired")
|
||||
|
@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sy
|
|||
// wal
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, 0, 0x1002, "WAL size exceeds limit")
|
||||
|
||||
// http
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")
|
||||
|
|
|
@ -321,6 +321,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
|
|||
*/
|
||||
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
|
||||
|
||||
int tsdbInitCommitQueue(int nthreads);
|
||||
void tsdbDestroyCommitQueue();
|
||||
int tsdbSyncCommit(TSDB_REPO_T *repo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -54,15 +54,17 @@ typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg)
|
|||
int32_t walInit();
|
||||
void walCleanUp();
|
||||
|
||||
twalh walOpen(char *path, SWalCfg *pCfg);
|
||||
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
|
||||
void walStop(twalh);
|
||||
void walClose(twalh);
|
||||
int32_t walRenew(twalh);
|
||||
int32_t walWrite(twalh, SWalHead *);
|
||||
void walFsync(twalh, bool forceFsync);
|
||||
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
||||
twalh walOpen(char *path, SWalCfg *pCfg);
|
||||
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
|
||||
void walStop(twalh);
|
||||
void walClose(twalh);
|
||||
int32_t walRenew(twalh);
|
||||
void walRemoveOneOldFile(twalh);
|
||||
void walRemoveAllOldFiles(twalh);
|
||||
int32_t walWrite(twalh, SWalHead *);
|
||||
void walFsync(twalh, bool forceFsync);
|
||||
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
||||
uint64_t walGetVersion(twalh);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -584,7 +584,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT;
|
||||
}
|
||||
|
||||
mDebug("dnode:%d, from offline to online", pDnode->dnodeId);
|
||||
mInfo("dnode:%d, from offline to online", pDnode->dnodeId);
|
||||
pDnode->status = TAOS_DN_STATUS_READY;
|
||||
pDnode->offlineReason = TAOS_DN_OFF_ONLINE;
|
||||
balanceSyncNotify();
|
||||
|
|
|
@ -63,9 +63,9 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
|
|||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
|
||||
epSet->inUse = (i + 1) % epSet->numOfEps;
|
||||
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
||||
mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
||||
} else {
|
||||
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
|
||||
mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7233,7 +7233,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
|
|||
pthread_mutex_destroy(&pQueryMgmt->lock);
|
||||
tfree(pQueryMgmt);
|
||||
|
||||
qDebug("vgId:%d queryMgmt cleanup completed", vgId);
|
||||
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
|
||||
}
|
||||
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||
|
|
|
@ -35,6 +35,8 @@ extern "C" {
|
|||
#define TAOS_SMSG_SYNC_MUST 6
|
||||
#define TAOS_SMSG_STATUS 7
|
||||
|
||||
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
|
||||
|
||||
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
|
||||
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
|
||||
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
|
||||
|
|
|
@ -79,7 +79,7 @@ int32_t syncInit() {
|
|||
info.numOfThreads = tsSyncTcpThreads;
|
||||
info.serverIp = 0;
|
||||
info.port = tsSyncPort;
|
||||
info.bufferSize = 640000;
|
||||
info.bufferSize = SYNC_MAX_SIZE;
|
||||
info.processBrokenLink = syncProcessBrokenLink;
|
||||
info.processIncomingMsg = syncProcessPeerMsg;
|
||||
info.processIncomingConn = syncProcessIncommingConnection;
|
||||
|
@ -486,7 +486,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
|||
pPeer->ip = ip;
|
||||
pPeer->port = pInfo->nodePort;
|
||||
pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0;
|
||||
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
|
||||
snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port);
|
||||
|
||||
pPeer->peerFd = -1;
|
||||
pPeer->syncFd = -1;
|
||||
|
@ -850,7 +850,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
|||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
SWalHead * pHead = (SWalHead *)cont;
|
||||
|
||||
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
|
||||
sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||
|
||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||
// nodeVersion = pHead->version;
|
||||
|
@ -859,7 +859,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
|||
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
||||
syncSaveIntoBuffer(pPeer, pHead);
|
||||
} else {
|
||||
sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version);
|
||||
sError("%s, forward discarded, hver:%" PRIu64, pPeer->id, pHead->version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -890,10 +890,11 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
|
|||
|
||||
// head.len = htonl(head.len);
|
||||
if (pHead->len < 0) {
|
||||
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len);
|
||||
sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len);
|
||||
return -1;
|
||||
}
|
||||
|
||||
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
|
||||
int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
|
||||
if (bytes != pHead->len) {
|
||||
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
|
||||
|
|
|
@ -244,7 +244,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
|
|||
}
|
||||
|
||||
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
|
||||
SWalHead *pHead = malloc(640000);
|
||||
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
|
||||
int32_t code = -1;
|
||||
int32_t bytes = 0;
|
||||
int32_t sfd;
|
||||
|
|
|
@ -86,7 +86,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
info.numOfThreads = 1;
|
||||
info.serverIp = 0;
|
||||
info.port = tsArbitratorPort;
|
||||
info.bufferSize = 640000;
|
||||
info.bufferSize = SYNC_MAX_SIZE;
|
||||
info.processBrokenLink = arbProcessBrokenLink;
|
||||
info.processIncomingMsg = arbProcessPeerMsg;
|
||||
info.processIncomingConn = arbProcessIncommingConnection;
|
||||
|
@ -128,7 +128,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
|
|||
}
|
||||
|
||||
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0;
|
||||
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
|
||||
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
|
||||
if (firstPkt.syncHead.vgId) {
|
||||
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
|
||||
tfree(pNode);
|
||||
|
|
|
@ -220,8 +220,7 @@ typedef struct {
|
|||
SMemTable* mem;
|
||||
SMemTable* imem;
|
||||
STsdbFileH* tsdbFileH;
|
||||
int commit;
|
||||
pthread_t commitThread;
|
||||
sem_t readyToCommit;
|
||||
pthread_mutex_t mutex;
|
||||
bool repoLocked;
|
||||
} STsdbRepo;
|
||||
|
@ -440,6 +439,7 @@ void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
|||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
|
||||
void* tsdbCommitData(STsdbRepo* pRepo);
|
||||
|
||||
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||
if (pIter == NULL) return NULL;
|
||||
|
@ -588,6 +588,9 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx);
|
|||
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle);
|
||||
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);
|
||||
|
||||
// ------------------ tsdbCommitQueue.c
|
||||
int tsdbScheduleCommit(STsdbRepo *pRepo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -110,7 +110,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d buffer pool is closed", REPO_ID(pRepo));
|
||||
tsdbDebug("vgId:%d, buffer pool is closed", REPO_ID(pRepo));
|
||||
}
|
||||
|
||||
SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
|
||||
|
@ -134,7 +134,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
|
|||
pBufBlock->offset = 0;
|
||||
pBufBlock->remain = pBufPool->bufBlockSize;
|
||||
|
||||
tsdbDebug("vgId:%d buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId);
|
||||
tsdbDebug("vgId:%d, buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId);
|
||||
return pNode;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tlist.h"
|
||||
#include "tsdbMain.h"
|
||||
|
||||
typedef struct {
|
||||
bool stop;
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t queueNotEmpty;
|
||||
int nthreads;
|
||||
SList * queue;
|
||||
pthread_t * threads;
|
||||
} SCommitQueue;
|
||||
|
||||
typedef struct {
|
||||
STsdbRepo *pRepo;
|
||||
} SCommitReq;
|
||||
|
||||
static void *tsdbLoopCommit(void *arg);
|
||||
|
||||
SCommitQueue tsCommitQueue = {0};
|
||||
|
||||
int tsdbInitCommitQueue(int nthreads) {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
|
||||
if (nthreads < 1) nthreads = 1;
|
||||
|
||||
pQueue->stop = false;
|
||||
pQueue->nthreads = nthreads;
|
||||
|
||||
pQueue->queue = tdListNew(0);
|
||||
if (pQueue->queue == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t));
|
||||
if (pQueue->threads == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tdListFree(pQueue->queue);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_mutex_init(&(pQueue->lock), NULL);
|
||||
pthread_cond_init(&(pQueue->queueNotEmpty), NULL);
|
||||
|
||||
for (int i = 0; i < nthreads; i++) {
|
||||
pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbDestroyCommitQueue() {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
|
||||
pthread_mutex_lock(&(pQueue->lock));
|
||||
|
||||
if (pQueue->stop) {
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
return;
|
||||
}
|
||||
|
||||
pQueue->stop = true;
|
||||
pthread_cond_broadcast(&(pQueue->queueNotEmpty));
|
||||
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
|
||||
for (size_t i = 0; i < pQueue->nthreads; i++) {
|
||||
pthread_join(pQueue->threads[i], NULL);
|
||||
}
|
||||
|
||||
free(pQueue->threads);
|
||||
tdListFree(pQueue->queue);
|
||||
pthread_cond_destroy(&(pQueue->queueNotEmpty));
|
||||
pthread_mutex_destroy(&(pQueue->lock));
|
||||
}
|
||||
|
||||
int tsdbScheduleCommit(STsdbRepo *pRepo) {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
|
||||
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq));
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SCommitReq *)pNode->data)->pRepo = pRepo;
|
||||
|
||||
pthread_mutex_lock(&(pQueue->lock));
|
||||
|
||||
ASSERT(!pQueue->stop);
|
||||
|
||||
tdListAppendNode(pQueue->queue, pNode);
|
||||
pthread_cond_signal(&(pQueue->queueNotEmpty));
|
||||
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *tsdbLoopCommit(void *arg) {
|
||||
SCommitQueue *pQueue = &tsCommitQueue;
|
||||
SListNode * pNode = NULL;
|
||||
STsdbRepo * pRepo = NULL;
|
||||
|
||||
while (true) {
|
||||
pthread_mutex_lock(&(pQueue->lock));
|
||||
|
||||
while (true) {
|
||||
pNode = tdListPopHead(pQueue->queue);
|
||||
if (pNode == NULL) {
|
||||
if (pQueue->stop) {
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
goto _exit;
|
||||
} else {
|
||||
pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock));
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&(pQueue->lock));
|
||||
|
||||
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
||||
|
||||
tsdbCommitData(pRepo);
|
||||
listNodeFree(pNode);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return NULL;
|
||||
}
|
|
@ -163,7 +163,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
|
|||
|
||||
if (toCommit) {
|
||||
tsdbAsyncCommit(pRepo);
|
||||
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
|
||||
sem_wait(&(pRepo->readyToCommit));
|
||||
}
|
||||
tsdbUnRefMemTable(pRepo, pRepo->mem);
|
||||
tsdbUnRefMemTable(pRepo, pRepo->imem);
|
||||
|
@ -675,6 +675,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = sem_init(&(pRepo->readyToCommit), 0, 1);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pRepo->repoLocked = false;
|
||||
|
||||
pRepo->rootDir = strdup(rootDir);
|
||||
|
@ -719,6 +725,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
|
|||
// tsdbFreeMemTable(pRepo->mem);
|
||||
// tsdbFreeMemTable(pRepo->imem);
|
||||
tfree(pRepo->rootDir);
|
||||
sem_destroy(&(pRepo->readyToCommit));
|
||||
pthread_mutex_destroy(&pRepo->mutex);
|
||||
free(pRepo);
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
|
|||
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
|
||||
static void tsdbFreeTableData(STableData *pTableData);
|
||||
static char * tsdbGetTsTupleKey(const void *data);
|
||||
static void * tsdbCommitData(void *arg);
|
||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
|
@ -262,43 +261,31 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
|
||||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||
SMemTable *pIMem = pRepo->imem;
|
||||
int code = 0;
|
||||
|
||||
if (pIMem != NULL) {
|
||||
ASSERT(pRepo->commit);
|
||||
tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo));
|
||||
code = pthread_join(pRepo->commitThread, NULL);
|
||||
tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo));
|
||||
if (code != 0) {
|
||||
tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
pRepo->commit = 0;
|
||||
}
|
||||
|
||||
ASSERT(pRepo->commit == 0);
|
||||
if (pRepo->mem != NULL) {
|
||||
sem_wait(&(pRepo->readyToCommit));
|
||||
|
||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
|
||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||
pRepo->imem = pRepo->mem;
|
||||
pRepo->mem = NULL;
|
||||
pRepo->commit = 1;
|
||||
code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
|
||||
if (code != 0) {
|
||||
tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
tsdbUnlockRepo(pRepo);
|
||||
return -1;
|
||||
}
|
||||
tsdbScheduleCommit(pRepo);
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
|
||||
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbSyncCommit(TSDB_REPO_T *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
tsdbAsyncCommit(pRepo);
|
||||
sem_wait(&(pRepo->readyToCommit));
|
||||
sem_post(&(pRepo->readyToCommit));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is an important function to load data or try to load data from memory skiplist iterator.
|
||||
*
|
||||
|
@ -419,6 +406,68 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
return 0;
|
||||
}
|
||||
|
||||
void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||
SMemTable * pMem = pRepo->imem;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
SDataCols * pDataCols = NULL;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
SCommitIter *iters = NULL;
|
||||
SRWHelper whelper = {0};
|
||||
ASSERT(pMem != NULL);
|
||||
|
||||
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
|
||||
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
|
||||
|
||||
// Create the iterator to read from cache
|
||||
if (pMem->numOfRows > 0) {
|
||||
iters = tsdbCreateCommitIters(pRepo);
|
||||
if (iters == NULL) {
|
||||
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
|
||||
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
|
||||
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
|
||||
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
|
||||
|
||||
// Loop to commit to each file
|
||||
for (int fid = sfid; fid <= efid; fid++) {
|
||||
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
|
||||
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Commit to update meta file
|
||||
if (tsdbCommitMeta(pRepo) < 0) {
|
||||
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
tsdbFitRetention(pRepo);
|
||||
|
||||
_exit:
|
||||
tdFreeDataCols(pDataCols);
|
||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||
tsdbDestroyHelper(&whelper);
|
||||
tsdbEndCommit(pRepo);
|
||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
|
@ -529,69 +578,6 @@ static void tsdbFreeTableData(STableData *pTableData) {
|
|||
|
||||
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
|
||||
|
||||
static void *tsdbCommitData(void *arg) {
|
||||
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
||||
SMemTable * pMem = pRepo->imem;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
SDataCols * pDataCols = NULL;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
SCommitIter *iters = NULL;
|
||||
SRWHelper whelper = {0};
|
||||
ASSERT(pRepo->commit == 1);
|
||||
ASSERT(pMem != NULL);
|
||||
|
||||
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
|
||||
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
|
||||
|
||||
// Create the iterator to read from cache
|
||||
if (pMem->numOfRows > 0) {
|
||||
iters = tsdbCreateCommitIters(pRepo);
|
||||
if (iters == NULL) {
|
||||
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
|
||||
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
|
||||
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
|
||||
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
|
||||
|
||||
// Loop to commit to each file
|
||||
for (int fid = sfid; fid <= efid; fid++) {
|
||||
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
|
||||
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Commit to update meta file
|
||||
if (tsdbCommitMeta(pRepo) < 0) {
|
||||
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
tsdbFitRetention(pRepo);
|
||||
|
||||
_exit:
|
||||
tdFreeDataCols(pDataCols);
|
||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||
tsdbDestroyHelper(&whelper);
|
||||
tsdbEndCommit(pRepo);
|
||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
|
@ -642,8 +628,8 @@ _err:
|
|||
}
|
||||
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo->commit == 1);
|
||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
|
||||
sem_post(&(pRepo->readyToCommit));
|
||||
}
|
||||
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||
|
|
|
@ -24,8 +24,8 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in
|
|||
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||
static void tSkipListCorrectLevel(SSkipList *pSkipList);
|
||||
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode);
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData);
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode);
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
|
||||
static SSkipListNode * tSkipListNewNode(uint8_t level);
|
||||
#define tSkipListFreeNode(n) tfree((n))
|
||||
|
||||
|
@ -108,17 +108,17 @@ void tSkipListDestroy(SSkipList *pSkipList) {
|
|||
SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
|
||||
if (pSkipList == NULL || pData == NULL) return NULL;
|
||||
|
||||
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
uint8_t dupMode = SL_DUP_MODE(pSkipList);
|
||||
SSkipListNode *pNode = NULL;
|
||||
|
||||
tSkipListWLock(pSkipList);
|
||||
|
||||
bool hasDup = tSkipListGetPosToPut(pSkipList, forward, pData);
|
||||
bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
|
||||
|
||||
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
|
||||
if (dupMode == SL_UPDATE_DUP_KEY) {
|
||||
pNode = SL_NODE_GET_FORWARD_POINTER(forward[0], 0);
|
||||
pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0);
|
||||
atomic_store_ptr(&(pNode->pData), pData);
|
||||
}
|
||||
} else {
|
||||
|
@ -126,7 +126,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
|
|||
if (pNode != NULL) {
|
||||
pNode->pData = pData;
|
||||
|
||||
tSkipListDoInsert(pSkipList, forward, pNode);
|
||||
tSkipListDoInsert(pSkipList, backward, pNode);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -310,7 +310,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) {
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) {
|
||||
for (int32_t i = 0; i < pNode->level; ++i) {
|
||||
if (i >= pSkipList->level) {
|
||||
SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
|
||||
|
@ -318,14 +318,14 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
|
|||
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
|
||||
SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
|
||||
} else {
|
||||
SSkipListNode *x = forward[i];
|
||||
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x;
|
||||
SSkipListNode *x = backward[i];
|
||||
SL_NODE_GET_FORWARD_POINTER(pNode, i) = x;
|
||||
|
||||
SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i);
|
||||
SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode;
|
||||
SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i);
|
||||
SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode;
|
||||
|
||||
SL_NODE_GET_FORWARD_POINTER(pNode, i) = next;
|
||||
SL_NODE_GET_FORWARD_POINTER(x, i) = pNode;
|
||||
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
|
||||
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -371,57 +371,57 @@ static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData) {
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) {
|
||||
int compare = 0;
|
||||
bool hasDupKey = false;
|
||||
char * pDataKey = pSkipList->keyFn(pData);
|
||||
|
||||
if (pSkipList->size == 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
forward[i] = pSkipList->pHead;
|
||||
backward[i] = pSkipList->pTail;
|
||||
}
|
||||
} else {
|
||||
char *pKey = NULL;
|
||||
|
||||
// Compare min key
|
||||
pKey = SL_GET_MIN_KEY(pSkipList);
|
||||
compare = pSkipList->comparFn(pDataKey, pKey);
|
||||
if (compare <= 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
forward[i] = pSkipList->pHead;
|
||||
}
|
||||
|
||||
return (compare == 0);
|
||||
}
|
||||
|
||||
// Compare max key
|
||||
pKey = SL_GET_MAX_KEY(pSkipList);
|
||||
compare = pSkipList->comparFn(pDataKey, pKey);
|
||||
if (compare > 0) {
|
||||
if (compare >= 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
|
||||
backward[i] = pSkipList->pTail;
|
||||
}
|
||||
|
||||
return (compare == 0);
|
||||
}
|
||||
|
||||
SSkipListNode *px = pSkipList->pHead;
|
||||
// Compare min key
|
||||
pKey = SL_GET_MIN_KEY(pSkipList);
|
||||
compare = pSkipList->comparFn(pDataKey, pKey);
|
||||
if (compare < 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i);
|
||||
}
|
||||
|
||||
return (compare == 0);
|
||||
}
|
||||
|
||||
SSkipListNode *px = pSkipList->pTail;
|
||||
for (int i = pSkipList->level - 1; i >= 0; --i) {
|
||||
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i);
|
||||
while (p != pSkipList->pTail) {
|
||||
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
|
||||
while (p != pSkipList->pHead) {
|
||||
pKey = SL_GET_NODE_KEY(pSkipList, p);
|
||||
|
||||
compare = pSkipList->comparFn(pKey, pDataKey);
|
||||
if (compare >= 0) {
|
||||
if (compare <= 0) {
|
||||
if (compare == 0 && !hasDupKey) hasDupKey = true;
|
||||
break;
|
||||
} else {
|
||||
px = p;
|
||||
p = SL_NODE_GET_FORWARD_POINTER(px, i);
|
||||
p = SL_NODE_GET_BACKWARD_POINTER(px, i);
|
||||
}
|
||||
}
|
||||
|
||||
forward[i] = px;
|
||||
backward[i] = px;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -377,7 +377,8 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, int
|
|||
|
||||
for(int32_t i = 0; i < comparedSegments; ++i) {
|
||||
if (clientVersionNumber[i] != serverVersionNumber[i]) {
|
||||
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, version);
|
||||
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version,
|
||||
client_version);
|
||||
return TSDB_CODE_TSC_INVALID_VERSION;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,10 +67,17 @@ int32_t vnodeInitResources() {
|
|||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) {
|
||||
vError("failed to init vnode commit queue");
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void vnodeCleanupResources() {
|
||||
tsdbDestroyCommitQueue();
|
||||
|
||||
if (tsVnodesHash != NULL) {
|
||||
vDebug("vnode list is cleanup");
|
||||
taosHashCleanup(tsVnodesHash);
|
||||
|
@ -308,6 +315,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
pVnode->version = walGetVersion(pVnode->wal);
|
||||
}
|
||||
|
||||
tsdbSyncCommit(pVnode->tsdb);
|
||||
walRemoveAllOldFiles(pVnode->wal);
|
||||
walRenew(pVnode->wal);
|
||||
|
||||
SSyncInfo syncInfo;
|
||||
|
@ -583,6 +592,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
|
|||
|
||||
if (status == TSDB_STATUS_COMMIT_OVER) {
|
||||
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||
walRemoveOneOldFile(pVnode->wal);
|
||||
return vnodeSaveVersion(pVnode);
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
if (handle == NULL) { // failed to register qhandle
|
||||
pRsp->code = terrno;
|
||||
terrno = 0;
|
||||
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||
vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||
tstrerror(pRsp->code));
|
||||
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||
return pRsp->code;
|
||||
|
|
|
@ -217,6 +217,11 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
|||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
}
|
||||
|
||||
if (pHead->len > TSDB_MAX_WAL_SIZE) {
|
||||
vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
|
||||
return TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
|
||||
SVWriteMsg *pWrite = taosAllocateQitem(size);
|
||||
if (pWrite == NULL) {
|
||||
|
|
|
@ -34,7 +34,7 @@ extern int32_t wDebugFlag;
|
|||
#define WAL_PREFIX "wal"
|
||||
#define WAL_PREFIX_LEN 3
|
||||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_MAX_SIZE (1024 * 1024)
|
||||
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
|
||||
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32)
|
||||
|
|
|
@ -128,16 +128,7 @@ void walClose(void *handle) {
|
|||
taosClose(pWal->fd);
|
||||
|
||||
if (pWal->keep != TAOS_WAL_KEEP) {
|
||||
int64_t fileId = -1;
|
||||
while (walGetNextFile(pWal, &fileId) >= 0) {
|
||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||
|
||||
if (remove(pWal->name) < 0) {
|
||||
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
||||
} else {
|
||||
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
||||
}
|
||||
}
|
||||
walRemoveAllOldFiles(pWal);
|
||||
} else {
|
||||
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
|
||||
}
|
||||
|
|
|
@ -58,24 +58,48 @@ int32_t walRenew(void *handle) {
|
|||
wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
|
||||
}
|
||||
|
||||
if (pWal->keep != TAOS_WAL_KEEP) {
|
||||
// remove the oldest wal file
|
||||
int64_t oldFileId = -1;
|
||||
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
|
||||
char walName[WAL_FILE_LEN] = {0};
|
||||
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
|
||||
if (remove(walName) < 0) {
|
||||
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
|
||||
} else {
|
||||
wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void walRemoveOneOldFile(void *handle) {
|
||||
SWal *pWal = handle;
|
||||
if (pWal == NULL) return;
|
||||
if (pWal->keep == TAOS_WAL_KEEP) return;
|
||||
|
||||
pthread_mutex_lock(&pWal->mutex);
|
||||
|
||||
// remove the oldest wal file
|
||||
int64_t oldFileId = -1;
|
||||
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
|
||||
char walName[WAL_FILE_LEN] = {0};
|
||||
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
|
||||
|
||||
if (remove(walName) < 0) {
|
||||
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
|
||||
} else {
|
||||
wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
}
|
||||
|
||||
return code;
|
||||
void walRemoveAllOldFiles(void *handle) {
|
||||
if (handle == NULL) return;
|
||||
|
||||
SWal * pWal = handle;
|
||||
int64_t fileId = -1;
|
||||
while (walGetNextFile(pWal, &fileId) >= 0) {
|
||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||
|
||||
if (remove(pWal->name) < 0) {
|
||||
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
|
||||
} else {
|
||||
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t walWrite(void *handle, SWalHead *pHead) {
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
|
||||
print ============== deploy
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3001
|
||||
sql connect
|
||||
|
||||
sql create database d1
|
||||
sql use d1
|
||||
|
||||
sql create table t1 (ts timestamp, i int)
|
||||
sql insert into t1 values(now, 1);
|
||||
|
||||
print =============== step3
|
||||
sleep 3000
|
||||
sql select * from t1;
|
||||
print rows: $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||
sleep 3000
|
||||
|
||||
print =============== step4
|
||||
system sh/exec.sh -n dnode1 -s start -x SIGKILL
|
||||
sleep 3000
|
||||
sql select * from t1;
|
||||
print rows: $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||
sleep 3000
|
||||
|
||||
print =============== step5
|
||||
system sh/exec.sh -n dnode1 -s start -x SIGKILL
|
||||
sleep 3000
|
||||
sql select * from t1;
|
||||
print rows: $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||
sleep 3000
|
||||
|
||||
print =============== step6
|
||||
system sh/exec.sh -n dnode1 -s start -x SIGKILL
|
||||
sleep 3000
|
||||
sql select * from t1;
|
||||
print rows: $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||
sleep 3000
|
||||
|
||||
print =============== step7
|
||||
system sh/exec.sh -n dnode1 -s start -x SIGKILL
|
||||
sleep 3000
|
||||
sql select * from t1;
|
||||
print rows: $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||
sleep 3000
|
||||
|
||||
print =============== step8
|
||||
system sh/exec.sh -n dnode1 -s start -x SIGKILL
|
||||
sleep 3000
|
||||
sql select * from t1;
|
||||
print rows: $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
|
@ -0,0 +1,46 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100
|
||||
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1
|
||||
system sh/cfg.sh -n dnode1 -c tableIncStepPerVnode -v 2
|
||||
|
||||
|
||||
print ============== deploy
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3001
|
||||
sql connect
|
||||
|
||||
sql create database d1
|
||||
sql use d1
|
||||
sql create table st (ts timestamp, tbcol int) TAGS(tgcol int)
|
||||
|
||||
$i = 0
|
||||
while $i < 100
|
||||
$tb = t . $i
|
||||
sql create table $tb using st tags( $i )
|
||||
sql insert into $tb values (now , $i )
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
sql_error sql create table tt (ts timestamp, i int)
|
||||
|
||||
print =============== step3
|
||||
sql select * from st;
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4
|
||||
sleep 3000
|
||||
|
||||
print =============== step4
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3000
|
||||
|
||||
sql select * from st;
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,124 @@
|
|||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
|
||||
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
|
||||
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
|
||||
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
|
||||
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c http -v 1
|
||||
system sh/cfg.sh -n dnode2 -c http -v 1
|
||||
system sh/cfg.sh -n dnode3 -c http -v 1
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
|
||||
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
|
||||
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c replica -v 3
|
||||
system sh/cfg.sh -n dnode2 -c replica -v 3
|
||||
system sh/cfg.sh -n dnode3 -c replica -v 3
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 940032
|
||||
system sh/cfg.sh -n dnode2 -c maxSQLLength -v 940032
|
||||
system sh/cfg.sh -n dnode3 -c maxSQLLength -v 940032
|
||||
|
||||
print ============== deploy
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 5001
|
||||
sql connect
|
||||
|
||||
sql create dnode $hostname2
|
||||
sql create dnode $hostname3
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
print =============== step1
|
||||
$x = 0
|
||||
show1:
|
||||
$x = $x + 1
|
||||
sleep 2000
|
||||
if $x == 5 then
|
||||
return -1
|
||||
endi
|
||||
sql show mnodes -x show1
|
||||
$mnode1Role = $data2_1
|
||||
print mnode1Role $mnode1Role
|
||||
$mnode2Role = $data2_2
|
||||
print mnode2Role $mnode2Role
|
||||
$mnode3Role = $data2_3
|
||||
print mnode3Role $mnode3Role
|
||||
|
||||
if $mnode1Role != master then
|
||||
goto show1
|
||||
endi
|
||||
if $mnode2Role != slave then
|
||||
goto show1
|
||||
endi
|
||||
if $mnode3Role != slave then
|
||||
goto show1
|
||||
endi
|
||||
|
||||
print =============== step2
|
||||
sql create database d1 replica 3
|
||||
sql use d1
|
||||
|
||||
sql create table table_rest (ts timestamp, i int)
|
||||
print sql length is 870KB
|
||||
restful d1 table_rest 1591072800 30000
|
||||
restful d1 table_rest 1591172800 30000
|
||||
restful d1 table_rest 1591272800 30000
|
||||
restful d1 table_rest 1591372800 30000
|
||||
restful d1 table_rest 1591472800 30000
|
||||
restful d1 table_rest 1591572800 30000
|
||||
restful d1 table_rest 1591672800 30000
|
||||
restful d1 table_rest 1591772800 30000
|
||||
restful d1 table_rest 1591872800 30000
|
||||
restful d1 table_rest 1591972800 30000
|
||||
|
||||
sql select * from table_rest;
|
||||
print rows: $rows
|
||||
if $rows != 300000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step3
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 5000
|
||||
sql select * from table_rest;
|
||||
print rows: $rows
|
||||
if $rows != 300000 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s start -x SIGINT
|
||||
sleep 5000
|
||||
|
||||
print =============== step4
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
sleep 5000
|
||||
sql select * from table_rest;
|
||||
print rows: $rows
|
||||
if $rows != 300000 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode2 -s start -x SIGINT
|
||||
sleep 5000
|
||||
|
||||
print =============== step5
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
sleep 5000
|
||||
sql select * from table_rest;
|
||||
print rows: $rows
|
||||
if $rows != 300000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
@ -236,6 +236,10 @@ cd ../../../debug; make
|
|||
./test.sh -f general/vector/table_query.sim
|
||||
./test.sh -f general/vector/table_time.sim
|
||||
|
||||
./test.sh -f general/wal/sync.sim
|
||||
./test.sh -f general/wal/kill.sim
|
||||
./test.sh -f general/wal/maxtables.sim
|
||||
|
||||
./test.sh -f unique/account/account_create.sim
|
||||
./test.sh -f unique/account/account_delete.sim
|
||||
./test.sh -f unique/account/account_len.sim
|
||||
|
|
Loading…
Reference in New Issue