Merge remote-tracking branch 'origin' into xiaoping/add_test_case
This commit is contained in:
commit
c6a9b30b80
|
@ -4661,7 +4661,7 @@ static int64_t generateData(char *recBuf, char **data_type,
|
||||||
double t = rand_double();
|
double t = rand_double();
|
||||||
pstr += sprintf(pstr, ",%20.8f", t);
|
pstr += sprintf(pstr, ",%20.8f", t);
|
||||||
} else if (strcasecmp(data_type[i % c], "BOOL") == 0) {
|
} else if (strcasecmp(data_type[i % c], "BOOL") == 0) {
|
||||||
bool b = taosRandom() & 1;
|
bool b = rand_bool() & 1;
|
||||||
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
|
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
|
||||||
} else if (strcasecmp(data_type[i % c], "BINARY") == 0) {
|
} else if (strcasecmp(data_type[i % c], "BINARY") == 0) {
|
||||||
char *s = malloc(lenOfBinary);
|
char *s = malloc(lenOfBinary);
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef struct {
|
||||||
int bufBlockSize;
|
int bufBlockSize;
|
||||||
int tBufBlocks;
|
int tBufBlocks;
|
||||||
int nBufBlocks;
|
int nBufBlocks;
|
||||||
|
int nRecycleBlocks;
|
||||||
int64_t index;
|
int64_t index;
|
||||||
SList* bufBlockList;
|
SList* bufBlockList;
|
||||||
} STsdbBufPool;
|
} STsdbBufPool;
|
||||||
|
@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
||||||
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
||||||
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
||||||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||||
|
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
|
||||||
|
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
|
||||||
|
|
||||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -71,6 +71,11 @@ struct STsdbRepo {
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
|
|
||||||
STsdbCfg config;
|
STsdbCfg config;
|
||||||
|
|
||||||
|
STsdbCfg save_config; // save apply config
|
||||||
|
bool config_changed; // config changed flag
|
||||||
|
pthread_mutex_t save_mutex; // protect save config
|
||||||
|
|
||||||
STsdbAppH appH;
|
STsdbAppH appH;
|
||||||
STsdbStat stat;
|
STsdbStat stat;
|
||||||
STsdbMeta* tsdbMeta;
|
STsdbMeta* tsdbMeta;
|
||||||
|
|
|
@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
|
||||||
pPool->tBufBlocks = pCfg->totalBlocks;
|
pPool->tBufBlocks = pCfg->totalBlocks;
|
||||||
pPool->nBufBlocks = 0;
|
pPool->nBufBlocks = 0;
|
||||||
pPool->index = 0;
|
pPool->index = 0;
|
||||||
|
pPool->nRecycleBlocks = 0;
|
||||||
|
|
||||||
for (int i = 0; i < pCfg->totalBlocks; i++) {
|
for (int i = 0; i < pCfg->totalBlocks; i++) {
|
||||||
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
||||||
|
@ -157,3 +158,45 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
|
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
|
||||||
|
|
||||||
|
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
|
||||||
|
if (oldTotalBlocks == pRepo->config.totalBlocks) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int err = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (tsdbLockRepo(pRepo) < 0) return terrno;
|
||||||
|
STsdbBufPool* pPool = pRepo->pPool;
|
||||||
|
|
||||||
|
if (pRepo->config.totalBlocks > oldTotalBlocks) {
|
||||||
|
for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) {
|
||||||
|
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
|
||||||
|
if (pBufBlock == NULL) goto err;
|
||||||
|
|
||||||
|
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
|
||||||
|
tsdbFreeBufBlock(pBufBlock);
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
err = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPool->nBufBlocks++;
|
||||||
|
}
|
||||||
|
pthread_cond_signal(&pPool->poolNotEmpty);
|
||||||
|
} else {
|
||||||
|
pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
err:
|
||||||
|
tsdbUnlockRepo(pRepo);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
|
||||||
|
STsdbBufBlock *pBufBlock = NULL;
|
||||||
|
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
|
||||||
|
tsdbFreeBufBlock(pBufBlock);
|
||||||
|
free(pNode);
|
||||||
|
pPool->nBufBlocks--;
|
||||||
|
}
|
|
@ -112,6 +112,32 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||||
|
pRepo->config_changed = false;
|
||||||
|
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
||||||
|
|
||||||
|
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
||||||
|
|
||||||
|
pRepo->config.compression = pRepo->save_config.compression;
|
||||||
|
pRepo->config.keep = pRepo->save_config.keep;
|
||||||
|
pRepo->config.keep1 = pRepo->save_config.keep1;
|
||||||
|
pRepo->config.keep2 = pRepo->save_config.keep2;
|
||||||
|
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
||||||
|
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||||
|
REPO_ID(pRepo),
|
||||||
|
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||||
|
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks);
|
||||||
|
|
||||||
|
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
|
||||||
|
if (!TAOS_SUCCEEDED(err)) {
|
||||||
|
tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s",
|
||||||
|
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
static void *tsdbLoopCommit(void *arg) {
|
static void *tsdbLoopCommit(void *arg) {
|
||||||
SCommitQueue *pQueue = &tsCommitQueue;
|
SCommitQueue *pQueue = &tsCommitQueue;
|
||||||
SListNode * pNode = NULL;
|
SListNode * pNode = NULL;
|
||||||
|
@ -138,6 +164,13 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
|
|
||||||
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
||||||
|
|
||||||
|
// check if need to apply new config
|
||||||
|
if (pRepo->config_changed) {
|
||||||
|
pthread_mutex_lock(&pRepo->save_mutex);
|
||||||
|
tsdbApplyRepoConfig(pRepo);
|
||||||
|
pthread_mutex_unlock(&pRepo->save_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbCommitData(pRepo);
|
tsdbCommitData(pRepo);
|
||||||
listNodeFree(pNode);
|
listNodeFree(pNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
|
||||||
|
|
||||||
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
||||||
// TODO: think about multithread cases
|
// TODO: think about multithread cases
|
||||||
|
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
|
||||||
|
|
||||||
|
STsdbCfg * pRCfg = &repo->config;
|
||||||
|
|
||||||
|
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
|
||||||
|
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
|
||||||
|
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
|
||||||
|
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
|
||||||
|
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
|
||||||
|
ASSERT(pRCfg->precision == pCfg->precision);
|
||||||
|
|
||||||
|
bool configChanged = false;
|
||||||
|
if (pRCfg->compression != pCfg->compression) {
|
||||||
|
configChanged = true;
|
||||||
|
}
|
||||||
|
if (pRCfg->keep != pCfg->keep) {
|
||||||
|
configChanged = true;
|
||||||
|
}
|
||||||
|
if (pRCfg->keep1 != pCfg->keep1) {
|
||||||
|
configChanged = true;
|
||||||
|
}
|
||||||
|
if (pRCfg->keep2 != pCfg->keep2) {
|
||||||
|
configChanged = true;
|
||||||
|
}
|
||||||
|
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
|
||||||
|
configChanged = true;
|
||||||
|
}
|
||||||
|
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
|
||||||
|
configChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!configChanged) {
|
||||||
|
tsdbError("vgId:%d no config changed", REPO_ID(repo));
|
||||||
|
}
|
||||||
|
|
||||||
|
int code = pthread_mutex_lock(&repo->save_mutex);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
STsdbCfg * pSaveCfg = &repo->save_config;
|
||||||
|
*pSaveCfg = repo->config;
|
||||||
|
|
||||||
|
pSaveCfg->compression = pCfg->compression;
|
||||||
|
pSaveCfg->keep = pCfg->keep;
|
||||||
|
pSaveCfg->keep1 = pCfg->keep1;
|
||||||
|
pSaveCfg->keep2 = pCfg->keep2;
|
||||||
|
pSaveCfg->cacheLastRow = pCfg->cacheLastRow;
|
||||||
|
pSaveCfg->totalBlocks = pCfg->totalBlocks;
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||||
|
REPO_ID(repo),
|
||||||
|
pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2,
|
||||||
|
pRCfg->cacheLastRow, pRCfg->totalBlocks);
|
||||||
|
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
|
||||||
|
REPO_ID(repo),
|
||||||
|
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||||
|
pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks);
|
||||||
|
|
||||||
|
repo->config_changed = true;
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&repo->save_mutex);
|
||||||
return 0;
|
return 0;
|
||||||
#if 0
|
#if 0
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = pthread_mutex_init(&(pRepo->save_mutex), NULL);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
tsdbFreeRepo(pRepo);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pRepo->config_changed = false;
|
||||||
|
|
||||||
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
code = errno;
|
code = errno;
|
||||||
|
|
|
@ -98,10 +98,17 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
||||||
STsdbBufPool *pBufPool = pRepo->pPool;
|
STsdbBufPool *pBufPool = pRepo->pPool;
|
||||||
|
|
||||||
SListNode *pNode = NULL;
|
SListNode *pNode = NULL;
|
||||||
|
bool recycleBlocks = pBufPool->nRecycleBlocks > 0;
|
||||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||||
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
|
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
|
||||||
|
if (pBufPool->nRecycleBlocks > 0) {
|
||||||
|
tsdbRecycleBufferBlock(pBufPool, pNode);
|
||||||
|
pBufPool->nRecycleBlocks -= 1;
|
||||||
|
} else {
|
||||||
tdListAppendNode(pBufPool->bufBlockList, pNode);
|
tdListAppendNode(pBufPool->bufBlockList, pNode);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (!recycleBlocks) {
|
||||||
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||||
|
@ -109,6 +116,8 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||||
|
|
||||||
for (int i = 0; i < pMemTable->maxTables; i++) {
|
for (int i = 0; i < pMemTable->maxTables; i++) {
|
||||||
|
@ -958,6 +967,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
||||||
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||||
STsdbCfg *pCfg = &pRepo->config;
|
STsdbCfg *pCfg = &pRepo->config;
|
||||||
|
|
||||||
|
// if cacheLastRow config has been reset, free the lastRow
|
||||||
|
if (!pCfg->cacheLastRow && pTable->lastRow != NULL) {
|
||||||
|
taosTZfree(pTable->lastRow);
|
||||||
|
TSDB_WLOCK_TABLE(pTable);
|
||||||
|
pTable->lastRow = NULL;
|
||||||
|
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||||
|
TSDB_WUNLOCK_TABLE(pTable);
|
||||||
|
}
|
||||||
|
|
||||||
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
|
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
|
||||||
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
|
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
|
||||||
SDataRow nrow = pTable->lastRow;
|
SDataRow nrow = pTable->lastRow;
|
||||||
|
|
|
@ -170,9 +170,10 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
||||||
vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged);
|
vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged);
|
||||||
|
|
||||||
if (/*tsdbCfgChanged || */syncCfgChanged) {
|
if (tsdbCfgChanged || syncCfgChanged) {
|
||||||
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
||||||
// dbCfgVersion can be corrected by status msg
|
// dbCfgVersion can be corrected by status msg
|
||||||
|
if (syncCfgChanged) {
|
||||||
if (!vnodeSetUpdatingStatus(pVnode)) {
|
if (!vnodeSetUpdatingStatus(pVnode)) {
|
||||||
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
||||||
pVnode->dbCfgVersion = dbCfgVersion;
|
pVnode->dbCfgVersion = dbCfgVersion;
|
||||||
|
@ -191,8 +192,9 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
|
||||||
vnodeSetReadyStatus(pVnode);
|
vnodeSetReadyStatus(pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pVnode->tsdb) {
|
if (tsdbCfgChanged && pVnode->tsdb) {
|
||||||
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pVnode->dbCfgVersion = dbCfgVersion;
|
pVnode->dbCfgVersion = dbCfgVersion;
|
||||||
|
|
Loading…
Reference in New Issue