Merge branch 'feature/TD-4034' into feature/TD-3950
This commit is contained in:
commit
1abd799eba
|
@ -3492,16 +3492,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
|
|
||||||
}
|
}
|
||||||
g_args.interlace_rows = interlaceRows->valueint;
|
g_args.interlace_rows = interlaceRows->valueint;
|
||||||
|
|
||||||
// rows per table need be less than insert batch
|
|
||||||
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
|
||||||
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
|
|
||||||
g_args.interlace_rows, g_args.num_of_RPR);
|
|
||||||
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
|
|
||||||
g_args.num_of_RPR);
|
|
||||||
prompt();
|
|
||||||
g_args.interlace_rows = g_args.num_of_RPR;
|
|
||||||
}
|
|
||||||
} else if (!interlaceRows) {
|
} else if (!interlaceRows) {
|
||||||
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
||||||
} else {
|
} else {
|
||||||
|
@ -3567,6 +3557,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rows per table need be less than insert batch
|
||||||
|
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
||||||
|
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
|
||||||
|
g_args.interlace_rows, g_args.num_of_RPR);
|
||||||
|
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
|
||||||
|
g_args.num_of_RPR);
|
||||||
|
prompt();
|
||||||
|
g_args.interlace_rows = g_args.num_of_RPR;
|
||||||
|
}
|
||||||
|
|
||||||
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
|
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
|
||||||
if (!dbs || dbs->type != cJSON_Array) {
|
if (!dbs || dbs->type != cJSON_Array) {
|
||||||
printf("ERROR: failed to read json, databases not found\n");
|
printf("ERROR: failed to read json, databases not found\n");
|
||||||
|
@ -4890,10 +4890,12 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
|
||||||
return affectedRows;
|
return affectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq)
|
static void getTableName(char *pTblName,
|
||||||
|
threadInfo* pThreadInfo, uint64_t tableSeq)
|
||||||
{
|
{
|
||||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||||
if (superTblInfo) {
|
if ((superTblInfo)
|
||||||
|
&& (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) {
|
||||||
if (superTblInfo->childTblLimit > 0) {
|
if (superTblInfo->childTblLimit > 0) {
|
||||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
|
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
|
||||||
superTblInfo->childTblName +
|
superTblInfo->childTblName +
|
||||||
|
@ -6768,6 +6770,7 @@ static void *superSubscribe(void *sarg) {
|
||||||
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
||||||
char subSqlstr[MAX_QUERY_SQL_LENGTH];
|
char subSqlstr[MAX_QUERY_SQL_LENGTH];
|
||||||
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
|
||||||
|
uint64_t tsubSeq;
|
||||||
|
|
||||||
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
|
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
|
||||||
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
|
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
|
||||||
|
@ -6776,6 +6779,15 @@ static void *superSubscribe(void *sarg) {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
|
||||||
|
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
|
||||||
|
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
|
||||||
|
g_queryInfo.superQueryInfo.sqlCount,
|
||||||
|
pThreadInfo->ntables,
|
||||||
|
MAX_QUERY_SQL_COUNT);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
if (pThreadInfo->taos == NULL) {
|
if (pThreadInfo->taos == NULL) {
|
||||||
TAOS * taos = NULL;
|
TAOS * taos = NULL;
|
||||||
taos = taos_connect(g_queryInfo.host,
|
taos = taos_connect(g_queryInfo.host,
|
||||||
|
@ -6804,6 +6816,8 @@ static void *superSubscribe(void *sarg) {
|
||||||
char topic[32] = {0};
|
char topic[32] = {0};
|
||||||
for (uint64_t i = pThreadInfo->start_table_from;
|
for (uint64_t i = pThreadInfo->start_table_from;
|
||||||
i <= pThreadInfo->end_table_to; i++) {
|
i <= pThreadInfo->end_table_to; i++) {
|
||||||
|
|
||||||
|
tsubSeq = i - pThreadInfo->start_table_from;
|
||||||
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
|
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
|
||||||
__func__, __LINE__,
|
__func__, __LINE__,
|
||||||
pThreadInfo->threadID,
|
pThreadInfo->threadID,
|
||||||
|
@ -6823,12 +6837,12 @@ static void *superSubscribe(void *sarg) {
|
||||||
|
|
||||||
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
|
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
|
||||||
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
|
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
|
||||||
tsub[i] = subscribeImpl(
|
tsub[tsubSeq] = subscribeImpl(
|
||||||
STABLE_CLASS,
|
STABLE_CLASS,
|
||||||
pThreadInfo, subSqlstr, topic,
|
pThreadInfo, subSqlstr, topic,
|
||||||
g_queryInfo.superQueryInfo.subscribeRestart,
|
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||||
g_queryInfo.superQueryInfo.subscribeInterval);
|
g_queryInfo.superQueryInfo.subscribeInterval);
|
||||||
if (NULL == tsub[i]) {
|
if (NULL == tsub[tsubSeq]) {
|
||||||
taos_close(pThreadInfo->taos);
|
taos_close(pThreadInfo->taos);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -6844,54 +6858,56 @@ static void *superSubscribe(void *sarg) {
|
||||||
while(1) {
|
while(1) {
|
||||||
for (uint64_t i = pThreadInfo->start_table_from;
|
for (uint64_t i = pThreadInfo->start_table_from;
|
||||||
i <= pThreadInfo->end_table_to; i++) {
|
i <= pThreadInfo->end_table_to; i++) {
|
||||||
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
|
tsubSeq = i - pThreadInfo->start_table_from;
|
||||||
continue;
|
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
|
||||||
}
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
res = taos_consume(tsub[i]);
|
res = taos_consume(tsub[tsubSeq]);
|
||||||
if (res) {
|
if (res) {
|
||||||
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
||||||
sprintf(pThreadInfo->fp, "%s-%d",
|
sprintf(pThreadInfo->fp, "%s-%d",
|
||||||
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
|
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
|
||||||
pThreadInfo->threadID);
|
pThreadInfo->threadID);
|
||||||
appendResultToFile(res, pThreadInfo->fp);
|
appendResultToFile(res, pThreadInfo->fp);
|
||||||
}
|
}
|
||||||
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
||||||
sprintf(pThreadInfo->fp, "%s-%d",
|
sprintf(pThreadInfo->fp, "%s-%d",
|
||||||
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
|
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
|
||||||
pThreadInfo->threadID);
|
pThreadInfo->threadID);
|
||||||
appendResultToFile(res, pThreadInfo->fp);
|
appendResultToFile(res, pThreadInfo->fp);
|
||||||
}
|
}
|
||||||
consumed[i] ++;
|
consumed[tsubSeq] ++;
|
||||||
|
|
||||||
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
|
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
|
||||||
&& (consumed[i] >=
|
&& (consumed[tsubSeq] >=
|
||||||
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
|
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
|
||||||
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
|
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
|
||||||
g_queryInfo.superQueryInfo.subscribeKeepProgress,
|
g_queryInfo.superQueryInfo.subscribeKeepProgress,
|
||||||
pThreadInfo->querySeq);
|
pThreadInfo->querySeq);
|
||||||
taos_unsubscribe(tsub,
|
taos_unsubscribe(tsub[tsubSeq],
|
||||||
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||||
consumed[i]= 0;
|
consumed[tsubSeq]= 0;
|
||||||
tsub[i] = subscribeImpl(
|
tsub[tsubSeq] = subscribeImpl(
|
||||||
STABLE_CLASS,
|
STABLE_CLASS,
|
||||||
pThreadInfo, subSqlstr, topic,
|
pThreadInfo, subSqlstr, topic,
|
||||||
g_queryInfo.superQueryInfo.subscribeRestart,
|
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||||
g_queryInfo.superQueryInfo.subscribeInterval
|
g_queryInfo.superQueryInfo.subscribeInterval
|
||||||
);
|
);
|
||||||
if (NULL == tsub[i]) {
|
if (NULL == tsub[tsubSeq]) {
|
||||||
taos_close(pThreadInfo->taos);
|
taos_close(pThreadInfo->taos);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
|
|
||||||
for (uint64_t i = pThreadInfo->start_table_from;
|
for (uint64_t i = pThreadInfo->start_table_from;
|
||||||
i <= pThreadInfo->end_table_to; i++) {
|
i <= pThreadInfo->end_table_to; i++) {
|
||||||
taos_unsubscribe(tsub[i], 0);
|
tsubSeq = i - pThreadInfo->start_table_from;
|
||||||
|
taos_unsubscribe(tsub[tsubSeq], 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_close(pThreadInfo->taos);
|
taos_close(pThreadInfo->taos);
|
||||||
|
|
|
@ -38,8 +38,9 @@ typedef struct STable {
|
||||||
SRWLatch latch; // TODO: implementa latch functions
|
SRWLatch latch; // TODO: implementa latch functions
|
||||||
|
|
||||||
SDataCol *lastCols;
|
SDataCol *lastCols;
|
||||||
int16_t lastColNum;
|
int16_t maxColNum;
|
||||||
int16_t maxColumnNum;
|
int16_t restoreColumnNum;
|
||||||
|
bool hasRestoreLastColumn;
|
||||||
int lastColSVersion;
|
int lastColSVersion;
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
} STable;
|
} STable;
|
||||||
|
@ -90,6 +91,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
|
||||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
||||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
||||||
STSchema* tsdbGetTableLatestSchema(STable *pTable);
|
STSchema* tsdbGetTableLatestSchema(STable *pTable);
|
||||||
|
void tsdbFreeLastColumns(STable* pTable);
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||||
|
|
|
@ -75,6 +75,9 @@ struct STsdbRepo {
|
||||||
STsdbCfg save_config; // save apply config
|
STsdbCfg save_config; // save apply config
|
||||||
bool config_changed; // config changed flag
|
bool config_changed; // config changed flag
|
||||||
pthread_mutex_t save_mutex; // protect save config
|
pthread_mutex_t save_mutex; // protect save config
|
||||||
|
|
||||||
|
uint8_t hasCachedLastRow;
|
||||||
|
uint8_t hasCachedLastColumn;
|
||||||
|
|
||||||
STsdbAppH appH;
|
STsdbAppH appH;
|
||||||
STsdbStat stat;
|
STsdbStat stat;
|
||||||
|
@ -100,6 +103,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||||
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
|
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
|
||||||
int tsdbCheckCommit(STsdbRepo* pRepo);
|
int tsdbCheckCommit(STsdbRepo* pRepo);
|
||||||
int tsdbRestoreInfo(STsdbRepo* pRepo);
|
int tsdbRestoreInfo(STsdbRepo* pRepo);
|
||||||
|
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
|
||||||
void tsdbGetRootDir(int repoid, char dirName[]);
|
void tsdbGetRootDir(int repoid, char dirName[]);
|
||||||
void tsdbGetDataDir(int repoid, char dirName[]);
|
void tsdbGetDataDir(int repoid, char dirName[]);
|
||||||
|
|
||||||
|
|
|
@ -113,11 +113,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||||
|
pthread_mutex_lock(&pRepo->save_mutex);
|
||||||
|
|
||||||
pRepo->config_changed = false;
|
pRepo->config_changed = false;
|
||||||
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
STsdbCfg * pSaveCfg = &pRepo->save_config;
|
||||||
|
STsdbCfg oldCfg;
|
||||||
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
|
||||||
|
|
||||||
|
memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg));
|
||||||
|
|
||||||
pRepo->config.compression = pRepo->save_config.compression;
|
pRepo->config.compression = pRepo->save_config.compression;
|
||||||
pRepo->config.keep = pRepo->save_config.keep;
|
pRepo->config.keep = pRepo->save_config.keep;
|
||||||
pRepo->config.keep1 = pRepo->save_config.keep1;
|
pRepo->config.keep1 = pRepo->save_config.keep1;
|
||||||
|
@ -125,10 +129,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||||
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
|
||||||
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
|
||||||
|
|
||||||
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)",
|
pthread_mutex_unlock(&pRepo->save_mutex);
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)",
|
||||||
REPO_ID(pRepo),
|
REPO_ID(pRepo),
|
||||||
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
|
||||||
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks);
|
pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks);
|
||||||
|
|
||||||
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
|
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
|
||||||
if (!TAOS_SUCCEEDED(err)) {
|
if (!TAOS_SUCCEEDED(err)) {
|
||||||
|
@ -136,6 +142,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
|
||||||
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
|
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
|
||||||
|
if (tsdbLockRepo(pRepo) < 0) return;
|
||||||
|
tsdbCacheLastData(pRepo, &oldCfg);
|
||||||
|
tsdbUnlockRepo(pRepo);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tsdbLoopCommit(void *arg) {
|
static void *tsdbLoopCommit(void *arg) {
|
||||||
|
@ -165,10 +177,8 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
||||||
|
|
||||||
// check if need to apply new config
|
// check if need to apply new config
|
||||||
if (pRepo->config_changed) {
|
if (pRepo->config_changed) {
|
||||||
pthread_mutex_lock(&pRepo->save_mutex);
|
|
||||||
tsdbApplyRepoConfig(pRepo);
|
tsdbApplyRepoConfig(pRepo);
|
||||||
pthread_mutex_unlock(&pRepo->save_mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCommitData(pRepo);
|
tsdbCommitData(pRepo);
|
||||||
|
|
|
@ -548,6 +548,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pRepo->config_changed = false;
|
pRepo->config_changed = false;
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastRow, 0);
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
||||||
|
|
||||||
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -636,7 +638,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
||||||
int err = 0;
|
int err = 0;
|
||||||
|
|
||||||
numColumns = schemaNCols(pSchema);
|
numColumns = schemaNCols(pSchema);
|
||||||
if (numColumns <= pTable->maxColumnNum) {
|
if (numColumns <= pTable->restoreColumnNum) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
|
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
|
||||||
|
@ -675,7 +677,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
||||||
SBlockIdx *pIdx = pReadh->pBlkIdx;
|
SBlockIdx *pIdx = pReadh->pBlkIdx;
|
||||||
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
|
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
|
||||||
|
|
||||||
while (numColumns > pTable->maxColumnNum && blockIdx >= 0) {
|
while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
|
||||||
bool loadStatisData = false;
|
bool loadStatisData = false;
|
||||||
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
|
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
|
||||||
blockIdx -= 1;
|
blockIdx -= 1;
|
||||||
|
@ -693,7 +695,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
||||||
loadStatisData = true;
|
loadStatisData = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) {
|
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
|
||||||
STColumn *pCol = schemaColAt(pSchema, i);
|
STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
// ignore loaded columns
|
// ignore loaded columns
|
||||||
if (pTable->lastCols[i].bytes != 0) {
|
if (pTable->lastCols[i].bytes != 0) {
|
||||||
|
@ -733,9 +735,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
||||||
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||||
pLastCol->ts = dataRowKey(row);
|
pLastCol->ts = dataRowKey(row);
|
||||||
|
|
||||||
pTable->maxColumnNum += 1;
|
pTable->restoreColumnNum += 1;
|
||||||
|
|
||||||
tsdbInfo("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -766,7 +768,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
if (pTable == NULL) continue;
|
if (pTable == NULL) continue;
|
||||||
pTable->maxColumnNum = 0;
|
pTable->restoreColumnNum = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -841,5 +843,165 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
|
if (CACHE_LAST_ROW(pCfg)) {
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastRow, 1);
|
||||||
|
}
|
||||||
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
|
||||||
|
bool cacheLastRow = false, cacheLastCol = false;
|
||||||
|
SFSIter fsiter;
|
||||||
|
SReadH readh;
|
||||||
|
SDFileSet *pSet;
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
//STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
|
SBlock * pBlock;
|
||||||
|
int tableNum = 0;
|
||||||
|
int maxTableIdx = 0;
|
||||||
|
int cacheLastRowTableNum = 0;
|
||||||
|
int cacheLastColTableNum = 0;
|
||||||
|
|
||||||
|
bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config));
|
||||||
|
bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config));
|
||||||
|
|
||||||
|
if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
|
||||||
|
tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed");
|
||||||
|
cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config));
|
||||||
|
cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config));
|
||||||
|
}
|
||||||
|
|
||||||
|
// calc max table idx and table num
|
||||||
|
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||||
|
STable *pTable = pMeta->tables[i];
|
||||||
|
if (pTable == NULL) continue;
|
||||||
|
tableNum += 1;
|
||||||
|
maxTableIdx = i;
|
||||||
|
if (cacheLastCol) {
|
||||||
|
pTable->restoreColumnNum = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if close last option,need to free data
|
||||||
|
if (need_free_last_row || need_free_last_col) {
|
||||||
|
if (need_free_last_row) {
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastRow, 0);
|
||||||
|
}
|
||||||
|
if (need_free_last_col) {
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
|
||||||
|
}
|
||||||
|
tsdbInfo("free cache last data since cacheLast option changed");
|
||||||
|
for (int i = 1; i < maxTableIdx; i++) {
|
||||||
|
STable *pTable = pMeta->tables[i];
|
||||||
|
if (pTable == NULL) continue;
|
||||||
|
if (need_free_last_row) {
|
||||||
|
taosTZfree(pTable->lastRow);
|
||||||
|
pTable->lastRow = NULL;
|
||||||
|
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||||
|
}
|
||||||
|
if (need_free_last_col) {
|
||||||
|
tsdbFreeLastColumns(pTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!cacheLastRow && !cacheLastCol) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheLastRowTableNum = cacheLastRow ? tableNum : 0;
|
||||||
|
cacheLastColTableNum = cacheLastCol ? tableNum : 0;
|
||||||
|
|
||||||
|
if (tsdbInitReadH(&readh, pRepo) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
||||||
|
|
||||||
|
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) {
|
||||||
|
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbLoadBlockIdx(&readh) < 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 1; i <= maxTableIdx; i++) {
|
||||||
|
STable *pTable = pMeta->tables[i];
|
||||||
|
if (pTable == NULL) continue;
|
||||||
|
|
||||||
|
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
|
||||||
|
|
||||||
|
if (tsdbSetReadTable(&readh, pTable) < 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SBlockIdx *pIdx = readh.pBlkIdx;
|
||||||
|
|
||||||
|
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
|
||||||
|
pTable->lastKey = pIdx->maxKey;
|
||||||
|
|
||||||
|
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
||||||
|
|
||||||
|
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the data in row
|
||||||
|
ASSERT(pTable->lastRow == NULL);
|
||||||
|
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||||
|
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||||
|
if (pTable->lastRow == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tdInitDataRow(pTable->lastRow, pSchema);
|
||||||
|
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||||
|
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||||
|
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
|
||||||
|
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||||
|
pCol->offset);
|
||||||
|
}
|
||||||
|
cacheLastRowTableNum -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore NULL columns
|
||||||
|
if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) {
|
||||||
|
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pTable->hasRestoreLastColumn) {
|
||||||
|
cacheLastColTableNum -= 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
|
||||||
|
if (cacheLastRow) {
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastRow, 1);
|
||||||
|
}
|
||||||
|
if (cacheLastCol) {
|
||||||
|
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -44,7 +44,6 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
|
||||||
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
||||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
||||||
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
|
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
|
||||||
static void tsdbFreeLastColumns(STable* pTable);
|
|
||||||
|
|
||||||
// ------------------ OUTER FUNCTIONS ------------------
|
// ------------------ OUTER FUNCTIONS ------------------
|
||||||
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
|
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
|
||||||
|
@ -590,12 +589,12 @@ void tsdbUnRefTable(STable *pTable) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbFreeLastColumns(STable* pTable) {
|
void tsdbFreeLastColumns(STable* pTable) {
|
||||||
if (pTable->lastCols == NULL) {
|
if (pTable->lastCols == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pTable->lastColNum; ++i) {
|
for (int i = 0; i < pTable->maxColNum; ++i) {
|
||||||
if (pTable->lastCols[i].bytes == 0) {
|
if (pTable->lastCols[i].bytes == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -605,14 +604,16 @@ static void tsdbFreeLastColumns(STable* pTable) {
|
||||||
}
|
}
|
||||||
tfree(pTable->lastCols);
|
tfree(pTable->lastCols);
|
||||||
pTable->lastCols = NULL;
|
pTable->lastCols = NULL;
|
||||||
pTable->lastColNum = 0;
|
pTable->maxColNum = 0;
|
||||||
|
pTable->lastColSVersion = -1;
|
||||||
|
pTable->restoreColumnNum = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
|
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
|
||||||
if (pTable->lastCols == NULL) {
|
if (pTable->lastCols == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
for (int16_t i = 0; i < pTable->lastColNum; ++i) {
|
for (int16_t i = 0; i < pTable->maxColNum; ++i) {
|
||||||
if (pTable->lastCols[i].colId == colId) {
|
if (pTable->lastCols[i].colId == colId) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
@ -640,8 +641,8 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTable->lastColSVersion = schemaVersion(pSchema);
|
pTable->lastColSVersion = schemaVersion(pSchema);
|
||||||
pTable->lastColNum = numOfColumn;
|
pTable->maxColNum = numOfColumn;
|
||||||
pTable->maxColumnNum = 0;
|
pTable->restoreColumnNum = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -682,11 +683,11 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCol *oldLastCols = pTable->lastCols;
|
SDataCol *oldLastCols = pTable->lastCols;
|
||||||
int16_t oldLastColNum = pTable->lastColNum;
|
int16_t oldLastColNum = pTable->maxColNum;
|
||||||
|
|
||||||
pTable->lastColSVersion = schemaVersion(pNewSchema);
|
pTable->lastColSVersion = schemaVersion(pNewSchema);
|
||||||
pTable->lastCols = lastCols;
|
pTable->lastCols = lastCols;
|
||||||
pTable->lastColNum = numOfCols;
|
pTable->maxColNum = numOfCols;
|
||||||
|
|
||||||
if (oldLastCols == NULL) {
|
if (oldLastCols == NULL) {
|
||||||
TSDB_WUNLOCK_TABLE(pTable);
|
TSDB_WUNLOCK_TABLE(pTable);
|
||||||
|
@ -797,8 +798,8 @@ static STable *tsdbNewTable() {
|
||||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
pTable->lastCols = NULL;
|
pTable->lastCols = NULL;
|
||||||
pTable->maxColumnNum = 0;
|
pTable->restoreColumnNum = 0;
|
||||||
pTable->lastColNum = 0;
|
pTable->maxColNum = 0;
|
||||||
pTable->lastColSVersion = -1;
|
pTable->lastColSVersion = -1;
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue