Merge pull request #25329 from taosdata/fix/3_liaohj
fix(stream): add lock, and fix race condition.
This commit is contained in:
commit
60cb1214f5
|
@ -29,21 +29,7 @@ typedef struct SCorEpSet {
|
||||||
|
|
||||||
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
||||||
|
|
||||||
#define EPSET_TO_STR(_eps, tbuf) \
|
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len);
|
||||||
do { \
|
|
||||||
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
|
|
||||||
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
|
|
||||||
if (_i == (_eps)->numOfEps - 1) { \
|
|
||||||
len += \
|
|
||||||
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
|
|
||||||
} else { \
|
|
||||||
len += \
|
|
||||||
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
|
|
||||||
} \
|
|
||||||
} \
|
|
||||||
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
||||||
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
||||||
|
|
||||||
|
|
|
@ -516,7 +516,6 @@ typedef struct SStreamMeta {
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
SMetaHbInfo* pHbInfo;
|
SMetaHbInfo* pHbInfo;
|
||||||
STaskUpdateInfo updateInfo;
|
STaskUpdateInfo updateInfo;
|
||||||
SHashObj* pUpdateTaskSet;
|
|
||||||
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
||||||
int32_t numOfPausedTasks;
|
int32_t numOfPausedTasks;
|
||||||
int64_t rid;
|
int64_t rid;
|
||||||
|
|
|
@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
|
||||||
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void epAssign(SEp* pDst, SEp* pSrc) {
|
void epAssign(SEp* pDst, SEp* pSrc) {
|
||||||
if (pSrc == NULL || pDst == NULL) {
|
if (pSrc == NULL || pDst == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) {
|
||||||
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
|
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
|
||||||
pDst->port = pSrc->port;
|
pDst->port = pSrc->port;
|
||||||
}
|
}
|
||||||
|
|
||||||
void epsetSort(SEpSet* pDst) {
|
void epsetSort(SEpSet* pDst) {
|
||||||
if (pDst->numOfEps <= 1) {
|
if (pDst->numOfEps <= 1) {
|
||||||
return;
|
return;
|
||||||
|
@ -127,6 +129,35 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) {
|
||||||
return ep;
|
return ep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) {
|
||||||
|
int len = snprintf(pBuf, bufLen, "epset:{");
|
||||||
|
if (len < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
if (_i == pEpSet->numOfEps - 1) {
|
||||||
|
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
|
||||||
|
} else {
|
||||||
|
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
len += ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len < bufLen) {
|
||||||
|
/*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
|
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
|
||||||
SJson* pJson = tjsonCreateObject();
|
SJson* pJson = tjsonCreateObject();
|
||||||
if (pJson == NULL) return -1;
|
if (pJson == NULL) return -1;
|
||||||
|
|
|
@ -1747,7 +1747,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&pCurrent->epset, buf);
|
epsetToStr(&pCurrent->epset, buf, tListLen(buf));
|
||||||
|
|
||||||
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
||||||
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
||||||
|
|
||||||
|
@ -1898,7 +1899,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
|
||||||
taosArrayPush(plist, pEntry);
|
taosArrayPush(plist, pEntry);
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&pEntry->epset, buf);
|
epsetToStr(&pEntry->epset, buf, tListLen(buf));
|
||||||
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
|
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
|
|
|
@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&entry.epset, buf);
|
epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
|
||||||
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||||
|
@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
entry.nodeId = SNODE_HANDLE;
|
entry.nodeId = SNODE_HANDLE;
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&entry.epset, buf);
|
epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
|
@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&epset, buf);
|
epsetToStr(&epset, buf, tListLen(buf));
|
||||||
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
||||||
|
|
||||||
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||||
|
|
|
@ -30,11 +30,11 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct SSnode {
|
struct SSnode {
|
||||||
char* path;
|
char* path;
|
||||||
SStreamMeta* pMeta;
|
SStreamMeta* pMeta;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
} SSnode;
|
};
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
||||||
}
|
}
|
||||||
|
|
||||||
// extract the required source task for a given stream, identified by streamId
|
// extract the required source task for a given stream, identified by streamId
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
if (pId->streamId != streamId) {
|
if (pId->streamId != streamId) {
|
||||||
|
@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
||||||
walCloseReader(pReader);
|
walCloseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
bool scanWal = false;
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
if (pStartInfo->taskStarting == 1) {
|
if (pStartInfo->taskStarting == 1) {
|
||||||
|
@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
pStartInfo->restartCount = 0;
|
pStartInfo->restartCount = 0;
|
||||||
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scanWal = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
if (scanWal && (vgId != SNODE_HANDLE)) {
|
||||||
|
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
|
||||||
|
tqScanWalAsync(pMeta->ahandle, true);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) {
|
||||||
|
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
||||||
startComplete_fn_t fn) {
|
startComplete_fn_t fn) {
|
||||||
int32_t code = -1;
|
|
||||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) {
|
||||||
|
|
||||||
taosHashCleanup(pMeta->pTasksMap);
|
taosHashCleanup(pMeta->pTasksMap);
|
||||||
taosHashCleanup(pMeta->pTaskDbUnique);
|
taosHashCleanup(pMeta->pTaskDbUnique);
|
||||||
taosHashCleanup(pMeta->pUpdateTaskSet);
|
|
||||||
taosHashCleanup(pMeta->updateInfo.pTasks);
|
taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||||
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||||
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||||
|
|
|
@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
||||||
|
|
||||||
// halt it self for count window stream task until the related
|
// halt it self for count window stream task until the related fill history task completed.
|
||||||
// fill history task completd.
|
|
||||||
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||||
|
|
|
@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
|
||||||
|
|
||||||
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||||
epsetAssign(&pTask->info.epSet, pEpSet);
|
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||||
EPSET_TO_STR(pEpSet, buf)
|
epsetToStr(pEpSet, buf, tListLen(buf));
|
||||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
|
||||||
|
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
EPSET_TO_STR(pEpSet, buf);
|
epsetToStr(pEpSet, buf, tListLen(buf));
|
||||||
|
|
||||||
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
||||||
|
@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
|
||||||
|
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
EPSET_TO_STR(pEpSet, buf);
|
epsetToStr(pEpSet, buf, tListLen(buf));
|
||||||
int32_t id = pTask->id.taskId;
|
int32_t id = pTask->id.taskId;
|
||||||
|
|
||||||
int8_t type = pTask->outputInfo.type;
|
int8_t type = pTask->outputInfo.type;
|
||||||
|
@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
|
||||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
|
|
||||||
// double check
|
|
||||||
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||||
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
char tbuf[512] = {0};
|
char tbuf[512] = {0};
|
||||||
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
|
||||||
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
|
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
|
||||||
pCtx->retryNextInterval);
|
pCtx->retryNextInterval);
|
||||||
return;
|
return;
|
||||||
|
@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
if (hasEpSet) {
|
if (hasEpSet) {
|
||||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||||
char tbuf[512] = {0};
|
char tbuf[512] = {0};
|
||||||
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
|
||||||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue