diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d1972bdcea..7abe3e99c7 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -94,7 +94,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); int tsdbCloseRepo(STsdbRepo *repo, int toCommit); int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg); int tsdbGetState(STsdbRepo *repo); -bool tsdbInCompact(STsdbRepo *repo); +int8_t tsdbGetCompactState(STsdbRepo *repo); // --------- TSDB TABLE DEFINITION typedef struct { uint64_t uid; // the unique table ID diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 36bfb1d442..5323b4306f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6661,19 +6661,20 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { if (isNull(val, type)) { continue; } - + char* p = val; size_t keyLen = 0; if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { tstr* var = (tstr*)(val); + p = var->data; keyLen = varDataLen(var); } else { keyLen = bytes; } int dummy; - void* res = taosHashGet(pInfo->pSet, val, keyLen); + void* res = taosHashGet(pInfo->pSet, p, keyLen); if (res == NULL) { - taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy)); + taosHashPut(pInfo->pSet, p, keyLen, &dummy, sizeof(dummy)); char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; memcpy(start, val, bytes); pRes->info.rows += 1; diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 757a0951e8..532907ae01 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -94,8 +94,9 @@ struct STsdbRepo { pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code + SMergeBuf mergeBuf; //used when update=2 - bool inCompact; // is in compact process? + int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? }; #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 0490f26b5e..62f9e41119 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -58,6 +58,7 @@ static int tsdbCompactFSetImpl(SCompactH *pComph); static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf, void **ppCBuf); +enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT}; int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); } void *tsdbCompactImpl(STsdbRepo *pRepo) { @@ -89,16 +90,21 @@ _err: } static int tsdbAsyncCompact(STsdbRepo *pRepo) { + if (pRepo->compactState != TSDB_NO_COMPACT) { + tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo)); + return 0; + } + pRepo->compactState = TSDB_WAITING_COMPACT; tsem_wait(&(pRepo->readyToCommit)); return tsdbScheduleCommit(pRepo, COMPACT_REQ); } static void tsdbStartCompact(STsdbRepo *pRepo) { - ASSERT(!pRepo->inCompact); + assert(pRepo->compactState != TSDB_IN_COMPACT); tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo)); tsdbStartFSTxn(pRepo, 0, 0); pRepo->code = TSDB_CODE_SUCCESS; - pRepo->inCompact = true; + pRepo->compactState = TSDB_IN_COMPACT; } static void tsdbEndCompact(STsdbRepo *pRepo, int eno) { @@ -107,7 +113,7 @@ static void tsdbEndCompact(STsdbRepo *pRepo, int eno) { } else { tsdbEndFSTxn(pRepo); } - pRepo->inCompact = false; + pRepo->compactState = TSDB_NO_COMPACT; tsdbInfo("vgId:%d compact over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsem_post(&(pRepo->readyToCommit)); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c877bfc7af..b2e6fe8916 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -200,7 +200,7 @@ STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; } int tsdbGetState(STsdbRepo *repo) { return repo->state; } -bool tsdbInCompact(STsdbRepo *repo) { return repo->inCompact; } +int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); } void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { ASSERT(repo != NULL); @@ -541,7 +541,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { pRepo->state = TSDB_STATE_OK; pRepo->code = TSDB_CODE_SUCCESS; - pRepo->inCompact = false; + pRepo->compactState = 0; pRepo->config = *pCfg; if (pAppH) { pRepo->appH = *pAppH; diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 67b9ce5ad9..8d699cb100 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -162,7 +162,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { pLoad->status = pVnode->status; pLoad->role = pVnode->role; pLoad->replica = pVnode->syncCfg.replica; - pLoad->compact = (pVnode->tsdb != NULL) && tsdbInCompact(pVnode->tsdb) ? 1 : 0; + pLoad->compact = (pVnode->tsdb != NULL) ? tsdbGetCompactState(pVnode->tsdb) : 0; } int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {