Merge pull request #6444 from taosdata/hotfix/TD-4642-2
[TD-4642]client subscribe issue
This commit is contained in:
commit
d30431ff88
|
@ -282,6 +282,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
|
||||||
|
|
||||||
SArray* tables = getTableList(pSql);
|
SArray* tables = getTableList(pSql);
|
||||||
if (tables == NULL) {
|
if (tables == NULL) {
|
||||||
|
pSub->lastSyncTime = 0; //force to get table list next time
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
size_t numOfTables = taosArrayGetSize(tables);
|
size_t numOfTables = taosArrayGetSize(tables);
|
||||||
|
@ -488,7 +489,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
SSub *pSub = (SSub *)tsub;
|
SSub *pSub = (SSub *)tsub;
|
||||||
if (pSub == NULL) return NULL;
|
if (pSub == NULL) return NULL;
|
||||||
|
|
||||||
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
if (pSub->pTimer == NULL) {
|
||||||
|
int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime;
|
||||||
|
if (duration < (int64_t)(pSub->interval)) {
|
||||||
|
tscDebug("subscription consume too frequently, blocking...");
|
||||||
|
taosMsleep(pSub->interval - (int32_t)duration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { //may reach here when retireve stable vgroup failed
|
||||||
SSqlObj* pSql = recreateSqlObj(pSub);
|
SSqlObj* pSql = recreateSqlObj(pSub);
|
||||||
if (pSql == NULL) {
|
if (pSql == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -500,6 +509,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
}
|
}
|
||||||
pSub->pSql = pSql;
|
pSub->pSql = pSql;
|
||||||
pSql->pSubscription = pSub;
|
pSql->pSubscription = pSub;
|
||||||
|
|
||||||
|
// no table list now, force to update it
|
||||||
|
tscDebug("begin table synchronization");
|
||||||
|
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
|
||||||
|
tscDebug("table synchronization completed");
|
||||||
}
|
}
|
||||||
|
|
||||||
tscSaveSubscriptionProgress(pSub);
|
tscSaveSubscriptionProgress(pSub);
|
||||||
|
@ -524,14 +538,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
|
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSub->pTimer == NULL) {
|
|
||||||
int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime;
|
|
||||||
if (duration < (int64_t)(pSub->interval)) {
|
|
||||||
tscDebug("subscription consume too frequently, blocking...");
|
|
||||||
taosMsleep(pSub->interval - (int32_t)duration);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo);
|
size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo);
|
||||||
size += sizeof(SQueryTableMsg) + 4096;
|
size += sizeof(SQueryTableMsg) + 4096;
|
||||||
int code = tscAllocPayload(&pSql->cmd, (int)size);
|
int code = tscAllocPayload(&pSql->cmd, (int)size);
|
||||||
|
|
Loading…
Reference in New Issue