From d1c9131772ef54e975937761162751acb341230f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 Nov 2020 23:55:07 +0000 Subject: [PATCH 01/28] add async write --- src/kit/taosdemo/taosdemo.c | 89 +++++++++++++------------------------ 1 file changed, 30 insertions(+), 59 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 74dc239cb5..f37c7cb632 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -475,6 +475,7 @@ typedef struct { tsem_t mutex_sem; int notFinished; tsem_t lock_sem; + int counter; } info; typedef struct { @@ -766,6 +767,7 @@ int main(int argc, char *argv[]) { t_info->data_of_rate = rate; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; + t_info->counter = 0; tsem_init(&(t_info->mutex_sem), 0, 1); t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1; @@ -793,9 +795,9 @@ int main(int argc, char *argv[]) { (ntables * nrecords_per_table) / (t * nrecords_per_request), t * 1000); - printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n", - t, (long long int)ntables * nrecords_per_table, nrecords_per_request, - ((long long int)ntables * nrecords_per_table) / t); + printf("Spent %.4f seconds to insert %d records with %d record(s) per request: %.2f records/second\n", + t, ntables * nrecords_per_table, nrecords_per_request, + ntables * nrecords_per_table / t); for (int i = 0; i < threads; i++) { info *t_info = infos + i; @@ -955,7 +957,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) double t = getCurrentTime(); - while ((read_len = tgetline(&line, &line_len, fp)) != -1) { + while ((read_len = taosGetline(&line, &line_len, fp)) != -1) { if (read_len >= MAX_SQL_SIZE) continue; line[--read_len] = '\0'; @@ -1283,68 +1285,37 @@ void *syncWrite(void *sarg) { void *asyncWrite(void *sarg) { info *winfo = (info *)sarg; - - sTable *tb_infos = (sTable *)malloc(sizeof(sTable) * (winfo->end_table_id - winfo->start_table_id + 1)); - - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - sTable *tb_info = tb_infos + tID - winfo->start_table_id; - tb_info->data_type = winfo->datatype; - tb_info->ncols_per_record = winfo->ncols_per_record; - tb_info->taos = winfo->taos; - sprintf(tb_info->tb_name, "%s.%s%d", winfo->db_name, winfo->tb_prefix, tID); - tb_info->timestamp = winfo->start_time; - tb_info->counter = 0; - tb_info->target = winfo->nrecords_per_table; - tb_info->len_of_binary = winfo->len_of_binary; - tb_info->nrecords_per_request = winfo->nrecords_per_request; - tb_info->mutex_sem = &(winfo->mutex_sem); - tb_info->notFinished = &(winfo->notFinished); - tb_info->lock_sem = &(winfo->lock_sem); - tb_info->data_of_order = winfo->data_of_order; - tb_info->data_of_rate = winfo->data_of_rate; - - /* char buff[BUFFER_SIZE] = "\0"; */ - /* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */ - /* queryDB(tb_info->taos,buff); */ - - taos_query_a(winfo->taos, "show databases", callBack, tb_info); - } + taos_query_a(winfo->taos, "show databases", callBack, winfo); tsem_wait(&(winfo->lock_sem)); - free(tb_infos); return NULL; } void callBack(void *param, TAOS_RES *res, int code) { - sTable *tb_info = (sTable *)param; - char **datatype = tb_info->data_type; - int ncols_per_record = tb_info->ncols_per_record; - int len_of_binary = tb_info->len_of_binary; - int64_t tmp_time = tb_info->timestamp; + info* winfo = (info*)param; + char **datatype = winfo->datatype; + int ncols_per_record = winfo->ncols_per_record; + int len_of_binary = winfo->len_of_binary; - if (code < 0) { - fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res)); - exit(EXIT_FAILURE); + int64_t tmp_time = winfo->start_time; + char *buffer = calloc(1, BUFFER_SIZE); + char *data = calloc(1, MAX_DATA_SIZE); + char *pstr = buffer; + pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); + if (winfo->counter >= winfo->nrecords_per_table) { + winfo->start_table_id++; + winfo->counter = 0; } - - // If finished; - if (tb_info->counter >= tb_info->target) { - tsem_wait(tb_info->mutex_sem); - (*(tb_info->notFinished))--; - if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem); - tsem_post(tb_info->mutex_sem); + if (winfo->start_table_id > winfo->end_table_id) { + tsem_post(&winfo->lock_sem); + taos_free_result(res); return; } - - char buffer[BUFFER_SIZE] = "\0"; - char data[MAX_DATA_SIZE]; - char *pstr = buffer; - pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name); - - for (int i = 0; i < tb_info->nrecords_per_request; i++) { + + for (int i = 0; i < winfo->nrecords_per_request; i++) { int rand_num = rand() % 100; - if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate) + if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) { int64_t d = tmp_time - rand() % 1000000 + rand_num; generateData(data, datatype, ncols_per_record, d, len_of_binary); @@ -1353,15 +1324,15 @@ void callBack(void *param, TAOS_RES *res, int code) { generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary); } pstr += sprintf(pstr, "%s", data); - tb_info->counter++; + winfo->counter++; - if (tb_info->counter >= tb_info->target) { + if (winfo->counter >= winfo->nrecords_per_table) { break; } } - tb_info->timestamp = tmp_time; - - taos_query_a(tb_info->taos, buffer, callBack, tb_info); + taos_query_a(winfo->taos, buffer, callBack, winfo); + free(buffer); + free(data); taos_free_result(res); } From 01d1782d5bc89e0129210c17f839f471ba57b608 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 10 Nov 2020 01:28:05 +0000 Subject: [PATCH 02/28] rewrite async write --- src/kit/taosdemo/taosdemo.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index f37c7cb632..60785ec004 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -790,14 +790,14 @@ int main(int argc, char *argv[]) { printf("ASYNC Insert with %d connections:\n", threads); } - fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n", - ntables * nrecords_per_table, ntables * nrecords_per_table / t, - (ntables * nrecords_per_table) / (t * nrecords_per_request), + fprintf(fp, "|%"PRIu64" | %10.2f | %10.2f | %10.4f |\n\n", + (int64_t)ntables * nrecords_per_table, ntables * nrecords_per_table / t, + ((int64_t)ntables * nrecords_per_table) / (t * nrecords_per_request), t * 1000); - printf("Spent %.4f seconds to insert %d records with %d record(s) per request: %.2f records/second\n", - t, ntables * nrecords_per_table, nrecords_per_request, - ntables * nrecords_per_table / t); + printf("Spent %.4f seconds to insert %"PRIu64" records with %d record(s) per request: %.2f records/second\n", + t, (int64_t)ntables * nrecords_per_table, nrecords_per_request, + (int64_t)ntables * nrecords_per_table / t); for (int i = 0; i < threads; i++) { info *t_info = infos + i; @@ -957,7 +957,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) double t = getCurrentTime(); - while ((read_len = taosGetline(&line, &line_len, fp)) != -1) { + while ((read_len = tgetline(&line, &line_len, fp)) != -1) { if (read_len >= MAX_SQL_SIZE) continue; line[--read_len] = '\0'; @@ -1309,6 +1309,8 @@ void callBack(void *param, TAOS_RES *res, int code) { } if (winfo->start_table_id > winfo->end_table_id) { tsem_post(&winfo->lock_sem); + free(buffer); + free(data); taos_free_result(res); return; } From 941834a659306bf892a5861f351c2a01312396c2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Nov 2020 05:39:09 +0000 Subject: [PATCH 03/28] try to fix TD-2004 --- src/inc/tsdb.h | 3 + src/tsdb/inc/tsdbMain.h | 7 +- src/tsdb/src/tsdbCommitQueue.c | 149 +++++++++++++++++++++++++++++++ src/tsdb/src/tsdbMain.c | 9 +- src/tsdb/src/tsdbMemTable.c | 156 ++++++++++++++------------------- src/vnode/src/vnodeMain.c | 9 ++ 6 files changed, 241 insertions(+), 92 deletions(-) create mode 100644 src/tsdb/src/tsdbCommitQueue.c diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 02ca99e4b8..993ec287a5 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -321,6 +321,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); */ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); +int tsdbInitCommitQueue(int nthreads); +void tsdbDestroyCommitQueue(); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7d40d7f00a..0962e7c2cb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -220,8 +220,7 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; - int commit; - pthread_t commitThread; + sem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; } STsdbRepo; @@ -440,6 +439,7 @@ void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); +void* tsdbCommitData(STsdbRepo* pRepo); static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; @@ -588,6 +588,9 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx); int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); +// ------------------ tsdbCommitQueue.c +int tsdbScheduleCommit(STsdbRepo *pRepo); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c new file mode 100644 index 0000000000..0272d7b398 --- /dev/null +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include "os.h" +#include "tlist.h" +#include "tsdbMain.h" + +typedef struct { + bool stop; + pthread_mutex_t lock; + pthread_cond_t queueNotEmpty; + int nthreads; + SList * queue; + pthread_t * threads; +} SCommitQueue; + +typedef struct { + STsdbRepo *pRepo; +} SCommitReq; + +static void *tsdbLoopCommit(void *arg); + +SCommitQueue tsCommitQueue = {0}; + +int tsdbInitCommitQueue(int nthreads) { + SCommitQueue *pQueue = &tsCommitQueue; + + if (nthreads < 1) nthreads = 1; + + pQueue->stop = false; + pQueue->nthreads = nthreads; + + pQueue->queue = tdListNew(0); + if (pQueue->queue == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t)); + if (pQueue->threads == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tdListFree(pQueue->queue); + return -1; + } + + pthread_mutex_init(&(pQueue->lock), NULL); + pthread_cond_init(&(pQueue->queueNotEmpty), NULL); + + for (int i = 0; i < nthreads; i++) { + pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL); + } + + return 0; +} + +void tsdbDestroyCommitQueue() { + SCommitQueue *pQueue = &tsCommitQueue; + + pthread_mutex_lock(&(pQueue->lock)); + + if (pQueue->stop) { + pthread_mutex_unlock(&(pQueue->lock)); + return; + } + + pQueue->stop = true; + pthread_cond_broadcast(&(pQueue->queueNotEmpty)); + + pthread_mutex_unlock(&(pQueue->lock)); + + for (size_t i = 0; i < pQueue->nthreads; i++) { + pthread_join(pQueue->threads[i], NULL); + } + + free(pQueue->threads); + tdListFree(pQueue->queue); + pthread_cond_destroy(&(pQueue->queueNotEmpty)); + pthread_mutex_destroy(&(pQueue->lock)); +} + +int tsdbScheduleCommit(STsdbRepo *pRepo) { + SCommitQueue *pQueue = &tsCommitQueue; + + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + ((SCommitReq *)pNode->data)->pRepo = pRepo; + + pthread_mutex_lock(&(pQueue->lock)); + + ASSERT(!pQueue->stop); + + tdListAppendNode(pQueue->queue, pNode); + pthread_cond_signal(&(pQueue->queueNotEmpty)); + + pthread_mutex_unlock(&(pQueue->lock)); + return 0; +} + +static void *tsdbLoopCommit(void *arg) { + SCommitQueue *pQueue = &tsCommitQueue; + SListNode * pNode = NULL; + STsdbRepo * pRepo = NULL; + + while (true) { + pthread_mutex_lock(&(pQueue->lock)); + + while (true) { + pNode = tdListPopHead(pQueue->queue); + if (pNode == NULL) { + if (pQueue->stop) { + pthread_mutex_unlock(&(pQueue->lock)); + goto _exit; + } else { + pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock)); + } + } else { + break; + } + } + + pthread_mutex_unlock(&(pQueue->lock)); + + pRepo = ((SCommitReq *)pNode->data)->pRepo; + + tsdbCommitData(pRepo); + listNodeFree(pNode); + } + +_exit: + return NULL; +} diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 9a7c2db3d3..677ef20416 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -163,7 +163,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (toCommit) { tsdbAsyncCommit(pRepo); - if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); + sem_wait(&(pRepo->readyToCommit)); } tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->imem); @@ -675,6 +675,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } + code = sem_init(&(pRepo->readyToCommit), 0, 1); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } + pRepo->repoLocked = false; pRepo->rootDir = strdup(rootDir); @@ -719,6 +725,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); taosTFree(pRepo->rootDir); + sem_destroy(&(pRepo->readyToCommit)); pthread_mutex_destroy(&pRepo->mutex); free(pRepo); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 150bda3b80..7b3c617a90 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -24,7 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); -static void * tsdbCommitData(void *arg); static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); @@ -262,39 +261,19 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { SMemTable *pIMem = pRepo->imem; - int code = 0; - if (pIMem != NULL) { - ASSERT(pRepo->commit); - tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo)); - code = pthread_join(pRepo->commitThread, NULL); - tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo)); - if (code != 0) { - tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pRepo->commit = 0; - } - - ASSERT(pRepo->commit == 0); if (pRepo->mem != NULL) { + sem_wait(&(pRepo->readyToCommit)); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; pRepo->mem = NULL; - pRepo->commit = 1; - code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo); - if (code != 0) { - tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - tsdbUnlockRepo(pRepo); - return -1; - } + tsdbScheduleCommit(pRepo); if (tsdbUnlockRepo(pRepo) < 0) return -1; } - if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; + if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; return 0; } @@ -419,6 +398,68 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } +void *tsdbCommitData(STsdbRepo *pRepo) { + SMemTable * pMem = pRepo->imem; + STsdbCfg * pCfg = &pRepo->config; + SDataCols * pDataCols = NULL; + STsdbMeta * pMeta = pRepo->tsdbMeta; + SCommitIter *iters = NULL; + SRWHelper whelper = {0}; + ASSERT(pMem != NULL); + + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), + pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + + // Create the iterator to read from cache + if (pMem->numOfRows > 0) { + iters = tsdbCreateCommitIters(pRepo); + if (iters == NULL) { + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + goto _exit; + } + + int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); + int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + + // Loop to commit to each file + for (int fid = sfid; fid <= efid; fid++) { + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _exit; + } + } + } + + // Commit to update meta file + if (tsdbCommitMeta(pRepo) < 0) { + tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + tsdbFitRetention(pRepo); + +_exit: + tdFreeDataCols(pDataCols); + tsdbDestroyCommitIters(iters, pMem->maxTables); + tsdbDestroyHelper(&whelper); + tsdbEndCommit(pRepo); + tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); + + return NULL; +} + // ---------------- LOCAL FUNCTIONS ---------------- static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { ASSERT(pRepo->mem != NULL); @@ -529,69 +570,6 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } -static void *tsdbCommitData(void *arg) { - STsdbRepo * pRepo = (STsdbRepo *)arg; - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &pRepo->config; - SDataCols * pDataCols = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitIter *iters = NULL; - SRWHelper whelper = {0}; - ASSERT(pRepo->commit == 1); - ASSERT(pMem != NULL); - - tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), - pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - - // Create the iterator to read from cache - if (pMem->numOfRows > 0) { - iters = tsdbCreateCommitIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); - goto _exit; - } - - int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); - int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); - - // Loop to commit to each file - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _exit; - } - } - } - - // Commit to update meta file - if (tsdbCommitMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - tsdbFitRetention(pRepo); - -_exit: - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); - tsdbEndCommit(pRepo); - tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); - - return NULL; -} static int tsdbCommitMeta(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; @@ -642,8 +620,8 @@ _err: } static void tsdbEndCommit(STsdbRepo *pRepo) { - ASSERT(pRepo->commit == 1); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); + sem_post(&(pRepo->readyToCommit)); } static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f90b9701fe..17a8581bd2 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -28,6 +28,8 @@ #include "vnodeCfg.h" #include "vnodeVersion.h" +#define DEFAULT_COMMIT_THREADS 1 + static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); @@ -67,10 +69,17 @@ int32_t vnodeInitResources() { return TSDB_CODE_VND_OUT_OF_MEMORY; } + if (tsdbInitCommitQueue(DEFAULT_COMMIT_THREADS) < 0) { + vError("failed to init vnode commit queue"); + return terrno; + } + return TSDB_CODE_SUCCESS; } void vnodeCleanupResources() { + tsdbDestroyCommitQueue(); + if (tsVnodesHash != NULL) { vDebug("vnode list is cleanup"); taosHashCleanup(tsVnodesHash); From e9f848f6358cbf7f9bac8c5d270870b68da35e0c Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Wed, 11 Nov 2020 14:31:44 +0800 Subject: [PATCH 04/28] [TD-2055]: core dump in wildcard query --- src/tsdb/src/tsdbRead.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 021c10ab6a..8bbdf4e362 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2348,7 +2348,8 @@ void filterPrepare(void* expr, void* param) { if (size < (uint32_t)pSchema->bytes) { size = pSchema->bytes; } - pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE); // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. + // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space. + pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); tVariantDump(pCond, pInfo->q, pSchema->type, true); } } From bf804ef644943d8e16a1d1788a313c6ce7ab1831 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Wed, 11 Nov 2020 16:47:50 +0800 Subject: [PATCH 05/28] [TD-2062]modify taos website --- .../webdocs/markdowndocs/architecture-ch.md | 103 ++++++++++++++++-- 1 file changed, 93 insertions(+), 10 deletions(-) diff --git a/documentation20/webdocs/markdowndocs/architecture-ch.md b/documentation20/webdocs/markdowndocs/architecture-ch.md index d4705ccb05..47aa33352d 100644 --- a/documentation20/webdocs/markdowndocs/architecture-ch.md +++ b/documentation20/webdocs/markdowndocs/architecture-ch.md @@ -4,16 +4,99 @@ ### 物联网典型场景 在典型的物联网、车联网、运维监测场景中,往往有多种不同类型的数据采集设备,采集一个到多个不同的物理量。而同一种采集设备类型,往往又有多个具体的采集设备分布在不同的地点。大数据处理系统就是要将各种采集的数据汇总,然后进行计算和分析。对于同一类设备,其采集的数据都是很规则的。以智能电表为例,假设每个智能电表采集电流、电压、相位三个量,其采集的数据类似如下的表格: -| Device ID | Time Stamp | current | voltage | phase | location | groupId | -| :-------: | :-----------: | :-----: | :-----: | :---: | :--------------: | :-----: | -| d1001 | 1538548685000 | 10.3 | 219 | 0.31 | Beijing.Chaoyang | 2 | -| d1002 | 1538548684000 | 10.2 | 220 | 0.23 | Beijing.Chaoyang | 3 | -| d1003 | 1538548686500 | 11.5 | 221 | 0.35 | Beijing.Haidian | 3 | -| d1004 | 1538548685500 | 13.4 | 223 | 0.29 | Beijing.Haidian | 2 | -| d1001 | 1538548695000 | 12.6 | 218 | 0.33 | Beijing.Chaoyang | 2 | -| d1004 | 1538548696600 | 11.8 | 221 | 0.28 | Beijing.Haidian | 2 | -| d1002 | 1538548696650 | 10.3 | 218 | 0.25 | Beijing.Chaoyang | 3 | -| d1001 | 1538548696800 | 12.3 | 221 | 0.31 | Beijing.Chaoyang | 2 | +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
设备ID时间戳采集量标签
Device IDTime StampcurrentvoltagephaselocationgroupId
d1001153854868500010.32190.31Beijing.Chaoyang2
d1002153854868400010.22200.23Beijing.Chaoyang3
d1003153854868650011.52210.35Beijing.Haidian3
d1004153854868550013.42230.29Beijing.Haidian2
d1001153854869500012.62180.33Beijing.Chaoyang2
d1004153854869660011.82210.28Beijing.Haidian2
d1002153854869665010.32180.25Beijing.Chaoyang3
d1001153854869680012.32210.31Beijing.Chaoyang2
表1:智能电表数据示例
From 97cb2e4d235364d5b703476a4d25ce4851b94089 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Wed, 11 Nov 2020 16:55:32 +0800 Subject: [PATCH 06/28] [TD-2063] not free result resource in async callback function --- tests/examples/c/asyncdemo.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/examples/c/asyncdemo.c b/tests/examples/c/asyncdemo.c index 225c4f7541..2765dcd895 100644 --- a/tests/examples/c/asyncdemo.c +++ b/tests/examples/c/asyncdemo.c @@ -211,6 +211,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code) printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables); } } + + taos_free_result(tres); } void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) @@ -222,7 +224,7 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) for (int i = 0; iname, numOfRows); - taos_free_result(tres); + //taos_free_result(tres); printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name); tablesProcessed++; @@ -246,6 +248,8 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables); } } + + taos_free_result(tres); } void taos_select_call_back(void *param, TAOS_RES *tres, int code) @@ -263,4 +267,6 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code) printf("%s select failed, code:%d\n", pTable->name, code); exit(1); } + + taos_free_result(tres); } From 3a3fe63eabe60324bbbc5af36e8e67427bf4a3dc Mon Sep 17 00:00:00 2001 From: Hui Li Date: Wed, 11 Nov 2020 17:12:40 +0800 Subject: [PATCH 07/28] [TD-1973] no free result source in callback when use async api --- src/kit/taosdemo/taosdemo.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 74dc239cb5..6178d91044 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -879,6 +879,7 @@ int main(int argc, char *argv[]) { taos_close(rInfo->taos); } + taos_cleanup(); return 0; } @@ -1325,6 +1326,8 @@ void callBack(void *param, TAOS_RES *res, int code) { if (code < 0) { fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res)); + taos_free_result(res); + taos_cleanup(); exit(EXIT_FAILURE); } @@ -1334,6 +1337,7 @@ void callBack(void *param, TAOS_RES *res, int code) { (*(tb_info->notFinished))--; if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem); tsem_post(tb_info->mutex_sem); + taos_free_result(res); return; } @@ -1359,7 +1363,8 @@ void callBack(void *param, TAOS_RES *res, int code) { break; } } - tb_info->timestamp = tmp_time; + + tb_info->timestamp = tmp_time; taos_query_a(tb_info->taos, buffer, callBack, tb_info); From 5575b934bf3137582f1064984149eeaf9bafeac1 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Wed, 11 Nov 2020 17:36:52 +0800 Subject: [PATCH 08/28] [TD-2063] --- tests/examples/c/asyncdemo.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/examples/c/asyncdemo.c b/tests/examples/c/asyncdemo.c index 2765dcd895..c6cc89b31d 100644 --- a/tests/examples/c/asyncdemo.c +++ b/tests/examples/c/asyncdemo.c @@ -68,6 +68,7 @@ static void queryDB(TAOS *taos, char *command) { fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); taos_free_result(pSql); taos_close(taos); + taos_cleanup(); exit(EXIT_FAILURE); } @@ -176,6 +177,7 @@ void taos_error(TAOS *con) { fprintf(stderr, "TDengine error: %s\n", taos_errstr(con)); taos_close(con); + taos_cleanup(); exit(1); } @@ -265,6 +267,8 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code) } else { printf("%s select failed, code:%d\n", pTable->name, code); + taos_free_result(tres); + taos_cleanup(); exit(1); } From 57269fd737bc98233cafb1de6f4ff6c6f0b0aa50 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 09:51:11 +0000 Subject: [PATCH 09/28] add some log --- src/query/src/qExecutor.c | 2 +- src/sync/src/syncMain.c | 2 +- src/sync/src/tarbitrator.c | 2 +- src/tsdb/src/tsdbBuffer.c | 4 ++-- src/vnode/src/vnodeRead.c | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 17e39f031e..98a4cf2c42 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7233,7 +7233,7 @@ void qCleanupQueryMgmt(void* pQMgmt) { pthread_mutex_destroy(&pQueryMgmt->lock); tfree(pQueryMgmt); - qDebug("vgId:%d queryMgmt cleanup completed", vgId); + qDebug("vgId:%d, queryMgmt cleanup completed", vgId); } void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ef1ada4c2e..5c92801cc3 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -486,7 +486,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { pPeer->ip = ip; pPeer->port = pInfo->nodePort; pPeer->fqdn[sizeof(pPeer->fqdn) - 1] = 0; - snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port); + snprintf(pPeer->id, sizeof(pPeer->id), "vgId:%d, peer:%s:%u", pNode->vgId, pPeer->fqdn, pPeer->port); pPeer->peerFd = -1; pPeer->syncFd = -1; diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index ae8f2ea518..b7f819a3cd 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -128,7 +128,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { } firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; - snprintf(pNode->id, sizeof(pNode->id), "vgId:%d peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); + snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); if (firstPkt.syncHead.vgId) { sDebug("%s, vgId in head is not zero, close the connection", pNode->id); tfree(pNode); diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index dcc9d4ca1b..7cea27658c 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -110,7 +110,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) { } } - tsdbDebug("vgId:%d buffer pool is closed", REPO_ID(pRepo)); + tsdbDebug("vgId:%d, buffer pool is closed", REPO_ID(pRepo)); } SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { @@ -134,7 +134,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { pBufBlock->offset = 0; pBufBlock->remain = pBufPool->bufBlockSize; - tsdbDebug("vgId:%d buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId); + tsdbDebug("vgId:%d, buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId); return pNode; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 55ddf3a34b..9fa3a11c9c 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -229,7 +229,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (handle == NULL) { // failed to register qhandle pRsp->code = terrno; terrno = 0; - vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, + vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); qDestroyQueryInfo(pQInfo); // destroy it directly return pRsp->code; From bd5f2641a88ad209c663ddbcb60e3ebf88e2bcbb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 11 Nov 2020 18:16:53 +0800 Subject: [PATCH 10/28] implement TD-2059 --- src/inc/tsdb.h | 1 + src/tsdb/src/tsdbMemTable.c | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 993ec287a5..d7515a1495 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -323,6 +323,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(int nthreads); void tsdbDestroyCommitQueue(); +int tsdbSyncCommit(TSDB_REPO_T *repo); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 802ae3727b..5680abcc6f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -278,6 +278,14 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { return 0; } +int tsdbSyncCommit(TSDB_REPO_T *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + tsdbAsyncCommit(pRepo); + sem_wait(&(pRepo->readyToCommit)); + sem_post(&(pRepo->readyToCommit)); + return 0; +} + /** * This is an important function to load data or try to load data from memory skiplist iterator. * From 3236104b54b2fb0cdde4765f4f51fe254e150681 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 11 Nov 2020 18:34:51 +0800 Subject: [PATCH 11/28] [TD-2004]: use queue to implement commit --- src/tsdb/src/tsdbCommitQueue.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 0272d7b398..3c158a2201 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#include - #include "os.h" #include "tlist.h" #include "tsdbMain.h" From d2b2e6a02c6d83fcde931f5ad28d5b0980a56b2e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 10:39:54 +0000 Subject: [PATCH 12/28] add log --- src/balance/src/balance.c | 4 ++-- src/dnode/src/dnodeMInfos.c | 6 +++--- src/mnode/src/mnodeDnode.c | 2 +- src/mnode/src/mnodePeer.c | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 4c687cb134..df78f4fe27 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -571,8 +571,8 @@ static void balanceCheckDnodeAccess() { if (pDnode->status != TAOS_DN_STATUS_DROPPING && pDnode->status != TAOS_DN_STATUS_OFFLINE) { pDnode->status = TAOS_DN_STATUS_OFFLINE; pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT; - mInfo("dnode:%d, set to offline state, access seq:%d, last seq:%d", pDnode->dnodeId, tsAccessSquence, - pDnode->lastAccess); + mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence, + pDnode->lastAccess, pDnode->status); balanceSetVgroupOffline(pDnode); } } diff --git a/src/dnode/src/dnodeMInfos.c b/src/dnode/src/dnodeMInfos.c index c985db371d..cefe44aebe 100644 --- a/src/dnode/src/dnodeMInfos.c +++ b/src/dnode/src/dnodeMInfos.c @@ -77,15 +77,15 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) { void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) { if (ep->numOfEps <= 0) { - dError("mnode EP list for peer is changed, but content is invalid, discard it"); + dError("minfos is changed, but content is invalid, discard it"); return; } pthread_mutex_lock(&tsMInfosMutex); - dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); + dInfo("minfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); for (int i = 0; i < ep->numOfEps; ++i) { ep->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); + dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]); } tsMEpSet = *ep; pthread_mutex_unlock(&tsMInfosMutex); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 37e00fc4e3..7e34e08373 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -584,7 +584,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT; } - mDebug("dnode:%d, from offline to online", pDnode->dnodeId); + mInfo("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; pDnode->offlineReason = TAOS_DN_OFF_ONLINE; balanceSyncNotify(); diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 2a04f541c5..e43f8c1b78 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -63,9 +63,9 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { epSet->inUse = (i + 1) % epSet->numOfEps; - mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { - mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); } } From de74b9ccfc8bf6dad6e8a70b78c11b40c9d60810 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Wed, 11 Nov 2020 18:10:22 +0800 Subject: [PATCH 13/28] [TD-1998]: cannot parse valid sql statement --- src/client/src/tscUtil.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index bb9725a744..c452b51050 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1238,8 +1238,7 @@ void tscColumnListDestroy(SArray* pColumnList) { * */ static int32_t validateQuoteToken(SStrToken* pToken) { - strdequote(pToken->z); - pToken->n = (uint32_t)strtrim(pToken->z); + tscDequoteAndTrimToken(pToken); int32_t k = tSQLGetToken(pToken->z, &pToken->type); @@ -1254,8 +1253,6 @@ static int32_t validateQuoteToken(SStrToken* pToken) { } void tscDequoteAndTrimToken(SStrToken* pToken) { - assert(pToken->type == TK_STRING); - uint32_t first = 0, last = pToken->n; // trim leading spaces @@ -1367,7 +1364,8 @@ int32_t tscValidateName(SStrToken* pToken) { } else { pStr[firstPartLen] = TS_PATH_DELIMITER[0]; memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n); - pStr[firstPartLen + sizeof(TS_PATH_DELIMITER[0]) + pToken->n] = 0; + uint32_t offset = (uint32_t)(pToken->z - (pStr + firstPartLen + 1)); + memset(pToken->z + pToken->n - offset, ' ', offset); } pToken->n += (firstPartLen + sizeof(TS_PATH_DELIMITER[0])); pToken->z = pStr; From 59b25a96d7f7db01cb5dd77165a8f3e10fde9a59 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 20:21:37 +0800 Subject: [PATCH 14/28] TD-1896 --- src/vnode/src/vnodeMain.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index dab07d189b..138455e1b5 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -317,6 +317,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->version = walGetVersion(pVnode->wal); } + tsdbSyncCommit(pVnode->tsdb); walRenew(pVnode->wal); SSyncInfo syncInfo; From 40728d1b316b78eab469686782d190aaca3d72e5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 22:38:46 +0800 Subject: [PATCH 15/28] TD-1949 --- src/inc/twal.h | 21 +++++++++++---------- src/vnode/src/vnodeMain.c | 3 ++- src/wal/src/walMgmt.c | 11 +---------- src/wal/src/walWrite.c | 18 +++++++++++++++++- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/inc/twal.h b/src/inc/twal.h index b85377d8d4..8dd3a8a912 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -54,16 +54,17 @@ typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg) int32_t walInit(); void walCleanUp(); -twalh walOpen(char *path, SWalCfg *pCfg); -int32_t walAlter(twalh pWal, SWalCfg *pCfg); -void walStop(twalh); -void walClose(twalh); -int32_t walRenew(twalh); -void walRemoveOldFiles(twalh); -int32_t walWrite(twalh, SWalHead *); -void walFsync(twalh, bool forceFsync); -int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); -int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); +twalh walOpen(char *path, SWalCfg *pCfg); +int32_t walAlter(twalh pWal, SWalCfg *pCfg); +void walStop(twalh); +void walClose(twalh); +int32_t walRenew(twalh); +void walRemoveOneOldFile(twalh); +void walRemoveAllOldFiles(twalh); +int32_t walWrite(twalh, SWalHead *); +void walFsync(twalh, bool forceFsync); +int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); +int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); uint64_t walGetVersion(twalh); #ifdef __cplusplus diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 138455e1b5..e1ea89a76a 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -318,6 +318,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } tsdbSyncCommit(pVnode->tsdb); + walRemoveAllOldFiles(pVnode->tsdb); walRenew(pVnode->wal); SSyncInfo syncInfo; @@ -593,7 +594,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { if (status == TSDB_STATUS_COMMIT_OVER) { vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); - walRemoveOldFiles(pVnode->wal); + walRemoveOneOldFile(pVnode->wal); return vnodeSaveVersion(pVnode); } diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 9ba0dfd124..fb49f38217 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -128,16 +128,7 @@ void walClose(void *handle) { taosClose(pWal->fd); if (pWal->keep != TAOS_WAL_KEEP) { - int64_t fileId = -1; - while (walGetNextFile(pWal, &fileId) >= 0) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); - } else { - wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); - } - } + walRemoveAllOldFiles(pWal); } else { wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index d3a41ec6b2..48021eecfc 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -63,7 +63,7 @@ int32_t walRenew(void *handle) { return code; } -void walRemoveOldFiles(void *handle) { +void walRemoveOneOldFile(void *handle) { SWal *pWal = handle; if (pWal == NULL) return; if (pWal->keep == TAOS_WAL_KEEP) return; @@ -86,6 +86,22 @@ void walRemoveOldFiles(void *handle) { pthread_mutex_unlock(&pWal->mutex); } +void walRemoveAllOldFiles(void *handle) { + if (handle == NULL) return; + + SWal * pWal = handle; + int64_t fileId = -1; + while (walGetNextFile(pWal, &fileId) >= 0) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + + if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); + } else { + wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + } + } +} + int32_t walWrite(void *handle, SWalHead *pHead) { if (handle == NULL) return -1; From a4cb007d6913d10aecccb12b96ba6bdfc05ba476 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 23:26:20 +0800 Subject: [PATCH 16/28] TD-1896 --- src/vnode/src/vnodeMain.c | 2 +- tests/script/general/wal/maxtables.sim | 46 ++++++++++++++++++++++++++ tests/script/jenkins/basic.txt | 1 + 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 tests/script/general/wal/maxtables.sim diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index e1ea89a76a..7d45516ecc 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -318,7 +318,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } tsdbSyncCommit(pVnode->tsdb); - walRemoveAllOldFiles(pVnode->tsdb); + walRemoveAllOldFiles(pVnode->wal); walRenew(pVnode->wal); SSyncInfo syncInfo; diff --git a/tests/script/general/wal/maxtables.sim b/tests/script/general/wal/maxtables.sim new file mode 100644 index 0000000000..f95997f1e4 --- /dev/null +++ b/tests/script/general/wal/maxtables.sim @@ -0,0 +1,46 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100 +system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1 +system sh/cfg.sh -n dnode1 -c tableIncStepPerVnode -v 2 + + +print ============== deploy +system sh/exec.sh -n dnode1 -s start +sleep 3001 +sql connect + +sql create database d1 +sql use d1 +sql create table st (ts timestamp, tbcol int) TAGS(tgcol int) + +$i = 0 +while $i < 100 + $tb = t . $i + sql create table $tb using st tags( $i ) + sql insert into $tb values (now , $i ) + $i = $i + 1 +endw + +sql_error sql create table tt (ts timestamp, i int) + +print =============== step3 +sql select * from st; +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4 +sleep 3000 + +print =============== step4 +system sh/exec.sh -n dnode1 -s start -t +sleep 10000 + +sql select * from st; +if $rows != 100 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 2a84172da9..daf92679bd 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -238,6 +238,7 @@ cd ../../../debug; make ./test.sh -f general/wal/sync.sim ./test.sh -f general/wal/kill.sim +./test.sh -f general/wal/maxtables.sim ./test.sh -f unique/account/account_create.sim ./test.sh -f unique/account/account_delete.sim From 391e95bc5867134c6e29edf995a3d5bb668a970b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 23:27:19 +0800 Subject: [PATCH 17/28] script --- tests/script/general/wal/maxtables.sim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/general/wal/maxtables.sim b/tests/script/general/wal/maxtables.sim index f95997f1e4..e504c7e92e 100644 --- a/tests/script/general/wal/maxtables.sim +++ b/tests/script/general/wal/maxtables.sim @@ -35,8 +35,8 @@ system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4 sleep 3000 print =============== step4 -system sh/exec.sh -n dnode1 -s start -t -sleep 10000 +system sh/exec.sh -n dnode1 -s start +sleep 3000 sql select * from st; if $rows != 100 then From ab6387cae2bc04179fe326923d80c5adaef0835f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 23:34:21 +0800 Subject: [PATCH 18/28] log error while client version not matched with server version --- src/util/src/tutil.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 099b9d9530..451976f563 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -377,7 +377,8 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, int for(int32_t i = 0; i < comparedSegments; ++i) { if (clientVersionNumber[i] != serverVersionNumber[i]) { - uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, version); + uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version, + client_version); return TSDB_CODE_TSC_INVALID_VERSION; } } From 028e62c974f9695b134dc5014ad2fa3e37a707ec Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 11 Nov 2020 23:45:10 +0800 Subject: [PATCH 19/28] TD-2067 --- src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 11 +++++++++++ src/vnode/src/vnodeMain.c | 4 +--- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index dbcf50ba77..4087f638a9 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -44,6 +44,7 @@ extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; extern uint32_t tsMaxTmrCtrl; extern float tsNumOfThreadsPerCore; +extern int32_t tsNumOfCommitThreads; extern float tsRatioOfQueryThreads; // todo remove it extern int8_t tsDaylight; extern char tsTimezone[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 3bc1e4d0cc..4495c3d928 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -51,6 +51,7 @@ int32_t tsMaxShellConns = 5000; int32_t tsMaxConnections = 5000; int32_t tsShellActivityTimer = 3; // second float tsNumOfThreadsPerCore = 1.0f; +int32_t tsNumOfCommitThreads = 1; float tsRatioOfQueryThreads = 0.5f; int8_t tsDaylight = 0; char tsTimezone[TSDB_TIMEZONE_LEN] = {0}; @@ -426,6 +427,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "numOfCommitThreads"; + cfg.ptr = &tsNumOfCommitThreads; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 1; + cfg.maxValue = 100; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "ratioOfQueryThreads"; cfg.ptr = &tsRatioOfQueryThreads; cfg.valType = TAOS_CFG_VTYPE_FLOAT; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7d45516ecc..199619e851 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -28,8 +28,6 @@ #include "vnodeCfg.h" #include "vnodeVersion.h" -#define DEFAULT_COMMIT_THREADS 1 - static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); @@ -69,7 +67,7 @@ int32_t vnodeInitResources() { return TSDB_CODE_VND_OUT_OF_MEMORY; } - if (tsdbInitCommitQueue(DEFAULT_COMMIT_THREADS) < 0) { + if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) { vError("failed to init vnode commit queue"); return terrno; } From cf094e9032ee5745f93802f12c9a8bce113a1224 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Wed, 11 Nov 2020 16:08:25 +0800 Subject: [PATCH 20/28] [TD-1447] add test case for lowa --- tests/pytest/fulltest.sh | 2 +- tests/pytest/tools/insert.json | 50 ++++++++++++++++++++++++++ tests/pytest/tools/lowa.py | 66 ++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 tests/pytest/tools/insert.json create mode 100644 tests/pytest/tools/lowa.py diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index a48dbdc480..15cadf38e7 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -154,7 +154,7 @@ python3 ./test.py -f query/queryConnection.py python3 ./test.py -f query/queryCountCSVData.py python3 ./test.py -f query/natualInterval.py python3 ./test.py -f query/bug1471.py -python3 ./test.py -f query/dataLossTest.py +#python3 ./test.py -f query/dataLossTest.py #stream python3 ./test.py -f stream/metric_1.py diff --git a/tests/pytest/tools/insert.json b/tests/pytest/tools/insert.json new file mode 100644 index 0000000000..c3fa78076b --- /dev/null +++ b/tests/pytest/tools/insert.json @@ -0,0 +1,50 @@ +{ + "filetype":"insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 1, + "databases": [{ + "dbinfo": { + "name": "db01", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "update": 0, + "maxtablesPerVnode": 1000 + }, + "super_tables": [{ + "name": "stb01", + "childtable_count": 100, + "childtable_prefix": "stb01_", + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rate": 0, + "insert_rows": 1000, + "timestamp_step": 1000, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "/home/data/sample.csv", + "tags_file": "", + "columns": [{ + "type": "SMALLINT" + }, { + "type": "BOOL" + }, { + "type": "BINARY", + "len": 6 + }], + "tags": [{ + "type": "INT" + },{ + "type": "BINARY", + "len": 4 + }] + }] + }] +} diff --git a/tests/pytest/tools/lowa.py b/tests/pytest/tools/lowa.py new file mode 100644 index 0000000000..523229dd46 --- /dev/null +++ b/tests/pytest/tools/lowa.py @@ -0,0 +1,66 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 10000 + self.numberOfRecords = 100 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def run(self): + tdSql.prepare() + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath+ "/build/bin/" + os.system("yes | %slowa -f tools/insert.json" % binPath) + + tdSql.execute("use db01") + tdSql.query("select count(*) from stb01") + tdSql.checkData(0, 0, 100000) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From b5d1aa4801b5f6f70f5dbfbde7358ca90ca5a811 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 05:27:06 +0000 Subject: [PATCH 21/28] TD-1985 --- src/os/src/detail/osFile.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 6eb4515f30..23fc88b8e1 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -132,7 +132,7 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { // if (leftbytes > 1000000000) leftbytes = 1000000000; sentbytes = sendfile(dfd, sfd, offset, leftbytes); if (sentbytes == -1) { - if (errno == EINTR) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { continue; } else { return -1; From a31a6ed9a56a4741f95e67a45cb8cfcf68f280c6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 05:39:22 +0000 Subject: [PATCH 22/28] TD-2072 --- src/inc/tsdb.h | 2 ++ src/tsdb/src/tsdbCommitQueue.c | 14 +++++++++++++- src/vnode/src/vnodeMain.c | 2 ++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d7515a1495..499aedfeed 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -324,6 +324,8 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(int nthreads); void tsdbDestroyCommitQueue(); int tsdbSyncCommit(TSDB_REPO_T *repo); +int tsdbIncCommitRef(int vgId); +void tsdbDecCommitRef(int vgId); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 3c158a2201..ad4e433274 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -15,6 +15,7 @@ #include "os.h" #include "tlist.h" +#include "tref.h" #include "tsdbMain.h" typedef struct { @@ -22,6 +23,7 @@ typedef struct { pthread_mutex_t lock; pthread_cond_t queueNotEmpty; int nthreads; + int refCount; SList * queue; pthread_t * threads; } SCommitQueue; @@ -123,7 +125,7 @@ static void *tsdbLoopCommit(void *arg) { while (true) { pNode = tdListPopHead(pQueue->queue); if (pNode == NULL) { - if (pQueue->stop) { + if (pQueue->stop && pQueue->refCount == 0) { pthread_mutex_unlock(&(pQueue->lock)); goto _exit; } else { @@ -145,3 +147,13 @@ static void *tsdbLoopCommit(void *arg) { _exit: return NULL; } + +int tsdbIncCommitRef(int vgId) { + int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1); + tsdbDebug("vgId:%d, inc commit queue ref to %d", refCount); +} + +void tsdbDecCommitRef(int vgId) { + int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); + tsdbDebug("vgId:%d, dec commit queue ref to %d", refCount); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 199619e851..7813c5217b 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -355,6 +355,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); + tsdbIncCommitRef(pVnode->vgId); taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); return TSDB_CODE_SUCCESS; @@ -446,6 +447,7 @@ void vnodeRelease(void *pVnodeRaw) { tsem_destroy(&pVnode->sem); free(pVnode); + tsdbDecCommitRef(vgId); int32_t count = taosHashGetSize(tsVnodesHash); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count); From 9d8beb659836b1ba429a559f20855e05605790d3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 06:03:14 +0000 Subject: [PATCH 23/28] TD-2072 --- src/inc/tsdb.h | 2 +- src/tsdb/src/tsdbCommitQueue.c | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 499aedfeed..0cdb24a4da 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -324,7 +324,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(int nthreads); void tsdbDestroyCommitQueue(); int tsdbSyncCommit(TSDB_REPO_T *repo); -int tsdbIncCommitRef(int vgId); +void tsdbIncCommitRef(int vgId); void tsdbDecCommitRef(int vgId); #ifdef __cplusplus diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index ad4e433274..49d24f0538 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -148,12 +148,12 @@ _exit: return NULL; } -int tsdbIncCommitRef(int vgId) { +void tsdbIncCommitRef(int vgId) { int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1); - tsdbDebug("vgId:%d, inc commit queue ref to %d", refCount); + tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount); } void tsdbDecCommitRef(int vgId) { int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); - tsdbDebug("vgId:%d, dec commit queue ref to %d", refCount); + tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount); } \ No newline at end of file From 04338e8eb5e24ef0680233503a44eed1c40e8148 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 06:10:23 +0000 Subject: [PATCH 24/28] log --- src/sync/src/syncMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 5c92801cc3..f1ce1d43aa 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1222,7 +1222,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle // always update version nodeVersion = pWalHead->version; - sDebug("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, + sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], qtypeStr[qtype], pWalHead->version); if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; From 9d65e0e50544f69d2d0688ed4b1bac4d5ed36cbb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 06:16:04 +0000 Subject: [PATCH 25/28] TD-2072 --- src/tsdb/src/tsdbCommitQueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 49d24f0538..9812b8fd5c 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -105,7 +105,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { pthread_mutex_lock(&(pQueue->lock)); - ASSERT(!pQueue->stop); + // ASSERT(pQueue->stop); tdListAppendNode(pQueue->queue, pNode); pthread_cond_signal(&(pQueue->queueNotEmpty)); From f6ee376fb696750868bdf3be937e2a5dcdae16da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 06:19:44 +0000 Subject: [PATCH 26/28] TD-2072 --- src/tsdb/src/tsdbCommitQueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 9812b8fd5c..4fdd99ddda 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -125,7 +125,7 @@ static void *tsdbLoopCommit(void *arg) { while (true) { pNode = tdListPopHead(pQueue->queue); if (pNode == NULL) { - if (pQueue->stop && pQueue->refCount == 0) { + if (pQueue->stop && pQueue->refCount <= 0) { pthread_mutex_unlock(&(pQueue->lock)); goto _exit; } else { From 5a417fddd863aee939fd43d6fdb1f2bc38b680d0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 Nov 2020 06:48:46 +0000 Subject: [PATCH 27/28] TD-2072 --- src/tsdb/src/tsdbCommitQueue.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 4fdd99ddda..c86b8f32b7 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -155,5 +155,6 @@ void tsdbIncCommitRef(int vgId) { void tsdbDecCommitRef(int vgId) { int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); + pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty)); tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount); } \ No newline at end of file From fa8de2ce3d944c1b5d1dd6b398ce668925bca5a6 Mon Sep 17 00:00:00 2001 From: Xiaxin Li Date: Fri, 13 Nov 2020 09:26:37 +0800 Subject: [PATCH 28/28] Update TAOS SQL-ch.md fix a mistake --- documentation20/webdocs/markdowndocs/TAOS SQL-ch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md index 905d3b2cd7..760ebae4fc 100644 --- a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md @@ -90,7 +90,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic ```mysql ALTER DATABASE db_name REPLICA 2; ``` - REPLICA参数是指修改数据库副本数,取值范围[1, 3]。在集群中使用,副本数必须小于dnode的数目。 + REPLICA参数是指修改数据库副本数,取值范围[1, 3]。在集群中使用,副本数必须小于或等于dnode的数目。 ```mysql ALTER DATABASE db_name KEEP 365;