[td-225]fix concurrent issue during building the heartbeat message.
This commit is contained in:
parent
c0d80b875d
commit
24db3339cf
|
@ -227,16 +227,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
|
||||||
|
|
||||||
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
||||||
SHeartBeatMsg *pHeartbeat = pMsg;
|
SHeartBeatMsg *pHeartbeat = pMsg;
|
||||||
|
|
||||||
int allocedQueriesNum = pHeartbeat->numOfQueries;
|
int allocedQueriesNum = pHeartbeat->numOfQueries;
|
||||||
int allocedStreamsNum = pHeartbeat->numOfStreams;
|
int allocedStreamsNum = pHeartbeat->numOfStreams;
|
||||||
|
|
||||||
pHeartbeat->numOfQueries = 0;
|
pHeartbeat->numOfQueries = 0;
|
||||||
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
|
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
|
||||||
|
|
||||||
// We extract the lock to tscBuildHeartBeatMsg function.
|
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
SSqlObj *pSql = pObj->sqlList;
|
SSqlObj *pSql = pObj->sqlList;
|
||||||
|
|
||||||
while (pSql) {
|
while (pSql) {
|
||||||
/*
|
/*
|
||||||
* avoid sqlobj may not be correctly removed from sql list
|
* avoid sqlobj may not be correctly removed from sql list
|
||||||
|
@ -248,45 +248,46 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(pQdesc->sql, pSql->sqlstr, sizeof(pQdesc->sql));
|
tstrncpy(pQdesc->sql, pSql->sqlstr, sizeof(pQdesc->sql));
|
||||||
pQdesc->stime = htobe64(pSql->stime);
|
pQdesc->stime = htobe64(pSql->stime);
|
||||||
pQdesc->queryId = htonl(pSql->queryId);
|
pQdesc->queryId = htonl(pSql->queryId);
|
||||||
//pQdesc->useconds = htobe64(pSql->res.useconds);
|
|
||||||
pQdesc->useconds = htobe64(now - pSql->stime);
|
pQdesc->useconds = htobe64(now - pSql->stime);
|
||||||
pQdesc->qId = htobe64(pSql->res.qId);
|
pQdesc->qId = htobe64(pSql->res.qId);
|
||||||
pQdesc->sqlObjId = htobe64(pSql->self);
|
pQdesc->sqlObjId = htobe64(pSql->self);
|
||||||
pQdesc->pid = pHeartbeat->pid;
|
pQdesc->pid = pHeartbeat->pid;
|
||||||
pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery;
|
|
||||||
pQdesc->numOfSub = pSql->subState.numOfSub;
|
pQdesc->numOfSub = pSql->subState.numOfSub;
|
||||||
|
pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery;
|
||||||
|
|
||||||
char *p = pQdesc->subSqlInfo;
|
char *p = pQdesc->subSqlInfo;
|
||||||
int32_t remainLen = sizeof(pQdesc->subSqlInfo);
|
int32_t remainLen = sizeof(pQdesc->subSqlInfo);
|
||||||
if (pQdesc->numOfSub == 0) {
|
if (pQdesc->numOfSub == 0) {
|
||||||
snprintf(p, remainLen, "N/A");
|
snprintf(p, remainLen, "N/A");
|
||||||
} else {
|
} else {
|
||||||
int32_t len;
|
|
||||||
if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
|
if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
|
||||||
for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
|
for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
|
||||||
SSqlObj* psub = pSql->pSubs[i];
|
SSqlObj *psub = pSql->pSubs[i];
|
||||||
int64_t self = (psub != NULL)? psub->self:0;
|
int64_t self = (psub != NULL)? psub->self : 0;
|
||||||
len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i,
|
|
||||||
self,
|
int32_t len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, self, pSql->subState.states[i] ? 'C' : 'I');
|
||||||
pSql->subState.states[i] ? 'C' : 'I');
|
if (len > remainLen) {
|
||||||
if (len > remainLen) {
|
break;
|
||||||
break;
|
}
|
||||||
|
|
||||||
|
remainLen -= len;
|
||||||
|
p += len;
|
||||||
}
|
}
|
||||||
remainLen -= len;
|
|
||||||
p += len;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
|
|
||||||
|
|
||||||
|
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
|
||||||
taosGetFqdn(pQdesc->fqdn);
|
taosGetFqdn(pQdesc->fqdn);
|
||||||
|
|
||||||
pHeartbeat->numOfQueries++;
|
pHeartbeat->numOfQueries++;
|
||||||
pQdesc++;
|
pQdesc++;
|
||||||
|
|
||||||
pSql = pSql->next;
|
pSql = pSql->next;
|
||||||
if (pHeartbeat->numOfQueries >= allocedQueriesNum) break;
|
if (pHeartbeat->numOfQueries >= allocedQueriesNum) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pHeartbeat->numOfStreams = 0;
|
pHeartbeat->numOfStreams = 0;
|
||||||
|
|
|
@ -1490,6 +1490,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tscFreeSubobj(pSql);
|
tscFreeSubobj(pSql);
|
||||||
|
|
||||||
pSql->signature = NULL;
|
pSql->signature = NULL;
|
||||||
pSql->fp = NULL;
|
pSql->fp = NULL;
|
||||||
tfree(pSql->sqlstr);
|
tfree(pSql->sqlstr);
|
||||||
|
|
Loading…
Reference in New Issue