[td-225]merge from master
This commit is contained in:
commit
6375ee7894
|
@ -367,6 +367,7 @@ static bool bnMonitorBalance() {
|
|||
for (int32_t dest = 0; dest < src; dest++) {
|
||||
SDnodeObj *pDestDnode = tsBnDnodes.list[dest];
|
||||
if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue;
|
||||
if (taosGetTimestampMs() - pDestDnode->createdTime < 2000) continue;
|
||||
|
||||
float destScore = bnTryCalcDnodeScore(pDestDnode, 1);
|
||||
if (srcScore + 0.0001 < destScore) continue;
|
||||
|
|
|
@ -283,6 +283,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
|
|||
|
||||
SArray* tables = getTableList(pSql);
|
||||
if (tables == NULL) {
|
||||
pSub->lastSyncTime = 0; //force to get table list next time
|
||||
return 0;
|
||||
}
|
||||
size_t numOfTables = taosArrayGetSize(tables);
|
||||
|
@ -489,7 +490,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
|||
SSub *pSub = (SSub *)tsub;
|
||||
if (pSub == NULL) return NULL;
|
||||
|
||||
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
||||
if (pSub->pTimer == NULL) {
|
||||
int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime;
|
||||
if (duration < (int64_t)(pSub->interval)) {
|
||||
tscDebug("subscription consume too frequently, blocking...");
|
||||
taosMsleep(pSub->interval - (int32_t)duration);
|
||||
}
|
||||
}
|
||||
|
||||
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { //may reach here when retireve stable vgroup failed
|
||||
SSqlObj* pSql = recreateSqlObj(pSub);
|
||||
if (pSql == NULL) {
|
||||
return NULL;
|
||||
|
@ -503,6 +512,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
|||
|
||||
pSub->pSql = pSql;
|
||||
pSql->pSubscription = pSub;
|
||||
|
||||
// no table list now, force to update it
|
||||
tscDebug("begin table synchronization");
|
||||
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
|
||||
tscDebug("table synchronization completed");
|
||||
}
|
||||
|
||||
tscSaveSubscriptionProgress(pSub);
|
||||
|
@ -527,14 +541,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
|||
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
|
||||
}
|
||||
|
||||
if (pSub->pTimer == NULL) {
|
||||
int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime;
|
||||
if (duration < (int64_t)(pSub->interval)) {
|
||||
tscDebug("subscription consume too frequently, blocking...");
|
||||
taosMsleep(pSub->interval - (int32_t)duration);
|
||||
}
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo);
|
||||
size += sizeof(SQueryTableMsg) + 4096;
|
||||
int code = tscAllocPayload(&pSql->cmd, (int)size);
|
||||
|
|
|
@ -27,6 +27,7 @@ void dnodeUpdateCfg(SDnodeCfg *cfg);
|
|||
int32_t dnodeGetDnodeId();
|
||||
void dnodeGetClusterId(char *clusterId);
|
||||
void dnodeGetCfg(int32_t *dnodeId, char *clusterId);
|
||||
void dnodeSetDropped();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
static SDnodeCfg tsCfg = {0};
|
||||
static pthread_mutex_t tsCfgMutex;
|
||||
static int32_t tsDnodeDropped;
|
||||
|
||||
static int32_t dnodeReadCfg();
|
||||
static int32_t dnodeWriteCfg();
|
||||
|
@ -34,6 +35,10 @@ int32_t dnodeInitCfg() {
|
|||
if (ret == 0) {
|
||||
dInfo("dnode cfg is initialized");
|
||||
}
|
||||
if (tsDnodeDropped) {
|
||||
dInfo("dnode is dropped, exiting");
|
||||
return -1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -44,6 +49,14 @@ void dnodeUpdateCfg(SDnodeCfg *cfg) {
|
|||
dnodeResetCfg(cfg);
|
||||
}
|
||||
|
||||
void dnodeSetDropped() {
|
||||
pthread_mutex_lock(&tsCfgMutex);
|
||||
tsDnodeDropped = 1;
|
||||
|
||||
dnodeWriteCfg();
|
||||
pthread_mutex_unlock(&tsCfgMutex);
|
||||
}
|
||||
|
||||
int32_t dnodeGetDnodeId() {
|
||||
int32_t dnodeId = 0;
|
||||
pthread_mutex_lock(&tsCfgMutex);
|
||||
|
@ -119,6 +132,14 @@ static int32_t dnodeReadCfg() {
|
|||
}
|
||||
cfg.dnodeId = (int32_t)dnodeId->valueint;
|
||||
|
||||
cJSON *dnodeDropped = cJSON_GetObjectItem(root, "dnodeDropped");
|
||||
if (!dnodeDropped || dnodeDropped->type != cJSON_Number) {
|
||||
dError("failed to read %s, dnodeDropped not found", file);
|
||||
//goto PARSE_CFG_OVER;
|
||||
} else {
|
||||
tsDnodeDropped = (int32_t)dnodeDropped->valueint;
|
||||
}
|
||||
|
||||
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
|
||||
if (!clusterId || clusterId->type != cJSON_String) {
|
||||
dError("failed to read %s, clusterId not found", file);
|
||||
|
@ -154,6 +175,7 @@ static int32_t dnodeWriteCfg() {
|
|||
|
||||
len += snprintf(content + len, maxLen - len, "{\n");
|
||||
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId);
|
||||
len += snprintf(content + len, maxLen - len, " \"dnodeDropped\": %d,\n", tsDnodeDropped);
|
||||
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId);
|
||||
len += snprintf(content + len, maxLen - len, "}\n");
|
||||
|
||||
|
|
|
@ -202,6 +202,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
|||
char clusterId[TSDB_CLUSTER_ID_LEN];
|
||||
dnodeGetClusterId(clusterId);
|
||||
if (clusterId[0] != '\0') {
|
||||
dnodeSetDropped();
|
||||
dError("exit zombie dropped dnode");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
|
|
@ -138,7 +138,6 @@ typedef struct SQueryInfo {
|
|||
bool hasFilter;
|
||||
bool onlyTagQuery;
|
||||
bool orderProjectQuery;
|
||||
// bool diffQuery;
|
||||
bool stateWindow;
|
||||
} SQueryInfo;
|
||||
|
||||
|
|
Loading…
Reference in New Issue