Merge pull request #7106 from taosdata/fix/TD-5618

Fix/td 5618
This commit is contained in:
Haojun Liao 2021-08-03 14:08:38 +08:00 committed by GitHub
commit f0a37123b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 19 additions and 11 deletions

View File

@ -94,7 +94,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(STsdbRepo *repo, int toCommit); int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo); int tsdbGetState(STsdbRepo *repo);
bool tsdbInCompact(STsdbRepo *repo); int8_t tsdbGetCompactState(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {
uint64_t uid; // the unique table ID uint64_t uid; // the unique table ID

View File

@ -6661,19 +6661,20 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
if (isNull(val, type)) { if (isNull(val, type)) {
continue; continue;
} }
char* p = val;
size_t keyLen = 0; size_t keyLen = 0;
if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) {
tstr* var = (tstr*)(val); tstr* var = (tstr*)(val);
p = var->data;
keyLen = varDataLen(var); keyLen = varDataLen(var);
} else { } else {
keyLen = bytes; keyLen = bytes;
} }
int dummy; int dummy;
void* res = taosHashGet(pInfo->pSet, val, keyLen); void* res = taosHashGet(pInfo->pSet, p, keyLen);
if (res == NULL) { if (res == NULL) {
taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy)); taosHashPut(pInfo->pSet, p, keyLen, &dummy, sizeof(dummy));
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
memcpy(start, val, bytes); memcpy(start, val, bytes);
pRes->info.rows += 1; pRes->info.rows += 1;

View File

@ -94,8 +94,9 @@ struct STsdbRepo {
pthread_mutex_t mutex; pthread_mutex_t mutex;
bool repoLocked; bool repoLocked;
int32_t code; // Commit code int32_t code; // Commit code
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
bool inCompact; // is in compact process? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
}; };
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId

View File

@ -58,6 +58,7 @@ static int tsdbCompactFSetImpl(SCompactH *pComph);
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf, static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf); void **ppCBuf);
enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT};
int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); } int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); }
void *tsdbCompactImpl(STsdbRepo *pRepo) { void *tsdbCompactImpl(STsdbRepo *pRepo) {
@ -89,16 +90,21 @@ _err:
} }
static int tsdbAsyncCompact(STsdbRepo *pRepo) { static int tsdbAsyncCompact(STsdbRepo *pRepo) {
if (pRepo->compactState != TSDB_NO_COMPACT) {
tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo));
return 0;
}
pRepo->compactState = TSDB_WAITING_COMPACT;
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
return tsdbScheduleCommit(pRepo, COMPACT_REQ); return tsdbScheduleCommit(pRepo, COMPACT_REQ);
} }
static void tsdbStartCompact(STsdbRepo *pRepo) { static void tsdbStartCompact(STsdbRepo *pRepo) {
ASSERT(!pRepo->inCompact); assert(pRepo->compactState != TSDB_IN_COMPACT);
tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo)); tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0); tsdbStartFSTxn(pRepo, 0, 0);
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->inCompact = true; pRepo->compactState = TSDB_IN_COMPACT;
} }
static void tsdbEndCompact(STsdbRepo *pRepo, int eno) { static void tsdbEndCompact(STsdbRepo *pRepo, int eno) {
@ -107,7 +113,7 @@ static void tsdbEndCompact(STsdbRepo *pRepo, int eno) {
} else { } else {
tsdbEndFSTxn(pRepo); tsdbEndFSTxn(pRepo);
} }
pRepo->inCompact = false; pRepo->compactState = TSDB_NO_COMPACT;
tsdbInfo("vgId:%d compact over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsdbInfo("vgId:%d compact over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} }

View File

@ -200,7 +200,7 @@ STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
int tsdbGetState(STsdbRepo *repo) { return repo->state; } int tsdbGetState(STsdbRepo *repo) { return repo->state; }
bool tsdbInCompact(STsdbRepo *repo) { return repo->inCompact; } int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
@ -541,7 +541,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->state = TSDB_STATE_OK; pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->inCompact = false; pRepo->compactState = 0;
pRepo->config = *pCfg; pRepo->config = *pCfg;
if (pAppH) { if (pAppH) {
pRepo->appH = *pAppH; pRepo->appH = *pAppH;

View File

@ -162,7 +162,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
pLoad->status = pVnode->status; pLoad->status = pVnode->status;
pLoad->role = pVnode->role; pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica; pLoad->replica = pVnode->syncCfg.replica;
pLoad->compact = (pVnode->tsdb != NULL) && tsdbInCompact(pVnode->tsdb) ? 1 : 0; pLoad->compact = (pVnode->tsdb != NULL) ? tsdbGetCompactState(pVnode->tsdb) : 0;
} }
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {