Merge remote-tracking branch 'origin/3.0' into feat/TD-24700
This commit is contained in:
commit
efc5589aa2
|
@ -168,3 +168,11 @@ All [scalar functions](../function/#scalar-functions) are available in stream pr
|
|||
- [unique](../function/#unique)
|
||||
- [mode](../function/#mode)
|
||||
|
||||
## Pause\Resume stream
|
||||
1.pause stream
|
||||
PAUSE STREAM [IF EXISTS] stream_name;
|
||||
If "IF EXISTS" is not specified and the stream does not exist, an error will be reported; If "IF EXISTS" is specified and the stream does not exist, success is returned; If the stream exists, paused all stream tasks.
|
||||
|
||||
2.resume stream
|
||||
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
|
||||
If "IF EXISTS" is not specified and the stream does not exist, an error will be reported. If "IF EXISTS" is specified and the stream does not exist, success is returned; If the stream exists, all of the stream tasks will be resumed. If "IGNORE UntREATED" is specified, data written during the pause period of stream is ignored when resuming stream.
|
||||
|
|
|
@ -1384,6 +1384,9 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int8_t syncState;
|
||||
int8_t syncRestore;
|
||||
int64_t syncTerm;
|
||||
int64_t roleTimeMs;
|
||||
int64_t startTimeMs;
|
||||
int8_t syncCanRead;
|
||||
int64_t cacheUsage;
|
||||
int64_t numOfTables;
|
||||
|
@ -1400,9 +1403,9 @@ typedef struct {
|
|||
} SVnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
int8_t syncState;
|
||||
int8_t syncRestore;
|
||||
int8_t syncState;
|
||||
int64_t syncTerm;
|
||||
int8_t syncRestore;
|
||||
int64_t roleTimeMs;
|
||||
} SMnodeLoad;
|
||||
|
||||
|
|
|
@ -106,6 +106,8 @@ typedef struct SMCtbCursor {
|
|||
void *pVal;
|
||||
int kLen;
|
||||
int vLen;
|
||||
int8_t paused;
|
||||
int lock;
|
||||
} SMCtbCursor;
|
||||
|
||||
typedef struct SRowBuffPos {
|
||||
|
@ -295,7 +297,9 @@ int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool
|
|||
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||
*/
|
||||
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
|
||||
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
|
||||
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
|
||||
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
|
||||
void (*closeCtbCursor)(SMCtbCursor *pCtbCur);
|
||||
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
|
||||
} SStoreMeta;
|
||||
|
||||
|
|
|
@ -241,6 +241,7 @@ typedef struct SSyncState {
|
|||
bool canRead;
|
||||
SyncTerm term;
|
||||
int64_t roleTimeMs;
|
||||
int64_t startTimeMs;
|
||||
} SSyncState;
|
||||
|
||||
int32_t syncInit();
|
||||
|
|
|
@ -284,7 +284,6 @@ static const SSysDbTableSchema topicSchema[] = {
|
|||
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||
};
|
||||
|
||||
|
||||
static const SSysDbTableSchema subscriptionSchema[] = {
|
||||
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||
|
@ -295,12 +294,13 @@ static const SSysDbTableSchema subscriptionSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema vnodesSchema[] = {
|
||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
||||
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "dnode_ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema userUserPrivilegesSchema[] = {
|
||||
|
|
|
@ -1083,8 +1083,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, reserved) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1;
|
||||
}
|
||||
|
||||
// mnode loads
|
||||
|
@ -1108,6 +1108,16 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
|
||||
|
||||
// vnode extra
|
||||
for (int32_t i = 0; i < vlen; ++i) {
|
||||
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
|
||||
int64_t reserved = 0;
|
||||
if (tEncodeI64(&encoder, pload->syncTerm) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -1152,7 +1162,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
|
||||
for (int32_t i = 0; i < vlen; ++i) {
|
||||
SVnodeLoad vload = {0};
|
||||
int64_t reserved64 = 0;
|
||||
vload.syncTerm = -1;
|
||||
int32_t reserved32 = 0;
|
||||
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
|
||||
|
@ -1166,14 +1176,15 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1;
|
||||
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// mnode loads
|
||||
if (tDecodeI8(&decoder, &pReq->mload.syncState) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->mload.syncRestore) < 0) return -1;
|
||||
|
||||
|
@ -1204,6 +1215,17 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tDecodeI8(&decoder, &pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
|
||||
}
|
||||
|
||||
// vnode extra
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
for (int32_t i = 0; i < vlen; ++i) {
|
||||
SVnodeLoad *pLoad = taosArrayGet(pReq->pVloads, i);
|
||||
int64_t reserved = 0;
|
||||
if (tDecodeI64(&decoder, &pLoad->syncTerm) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||
}
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
|
|
@ -347,8 +347,11 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
ESyncState syncState;
|
||||
int64_t syncTerm;
|
||||
bool syncRestore;
|
||||
bool syncCanRead;
|
||||
int64_t roleTimeMs;
|
||||
int64_t startTimeMs;
|
||||
ESyncRole nodeRole;
|
||||
} SVnodeGid;
|
||||
|
||||
|
|
|
@ -424,6 +424,47 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
|
||||
bool stateChanged = false;
|
||||
bool roleChanged = pGid->syncState != pVload->syncState ||
|
||||
(pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
|
||||
pGid->roleTimeMs != pVload->roleTimeMs;
|
||||
if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
|
||||
pGid->startTimeMs != pVload->startTimeMs) {
|
||||
mInfo(
|
||||
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
|
||||
"canRead:%d, dnode:%d",
|
||||
vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
|
||||
pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
|
||||
pGid->syncState = pVload->syncState;
|
||||
pGid->syncTerm = pVload->syncTerm;
|
||||
pGid->syncRestore = pVload->syncRestore;
|
||||
pGid->syncCanRead = pVload->syncCanRead;
|
||||
pGid->startTimeMs = pVload->startTimeMs;
|
||||
pGid->roleTimeMs = pVload->roleTimeMs;
|
||||
stateChanged = true;
|
||||
}
|
||||
return stateChanged;
|
||||
}
|
||||
|
||||
static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
|
||||
bool stateChanged = false;
|
||||
bool roleChanged = pObj->syncState != pMload->syncState ||
|
||||
(pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
|
||||
pObj->roleTimeMs != pMload->roleTimeMs;
|
||||
if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
|
||||
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
|
||||
pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
|
||||
pObj->syncTerm, pMload->syncTerm);
|
||||
pObj->syncState = pMload->syncState;
|
||||
pObj->syncTerm = pMload->syncTerm;
|
||||
pObj->syncRestore = pMload->syncRestore;
|
||||
pObj->roleTimeMs = pMload->roleTimeMs;
|
||||
stateChanged = true;
|
||||
}
|
||||
return stateChanged;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStatusReq statusReq = {0};
|
||||
|
@ -496,26 +537,21 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
|||
pVgroup->compStorage = pVload->compStorage;
|
||||
pVgroup->pointsWritten = pVload->pointsWritten;
|
||||
}
|
||||
bool roleChanged = false;
|
||||
bool stateChanged = false;
|
||||
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
|
||||
SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
|
||||
if (pGid->dnodeId == statusReq.dnodeId) {
|
||||
if (pGid->syncState != pVload->syncState || pGid->syncRestore != pVload->syncRestore ||
|
||||
pGid->syncCanRead != pVload->syncCanRead) {
|
||||
mInfo(
|
||||
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
|
||||
"canRead:%d, dnode:%d",
|
||||
pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead,
|
||||
syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead, pDnode->id);
|
||||
pGid->syncState = pVload->syncState;
|
||||
pGid->syncRestore = pVload->syncRestore;
|
||||
pGid->syncCanRead = pVload->syncCanRead;
|
||||
roleChanged = true;
|
||||
if (pVload->startTimeMs == 0) {
|
||||
pVload->startTimeMs = statusReq.rebootTime;
|
||||
}
|
||||
if (pVload->roleTimeMs == 0) {
|
||||
pVload->roleTimeMs = statusReq.rebootTime;
|
||||
}
|
||||
stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (roleChanged) {
|
||||
if (stateChanged) {
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
|
||||
|
@ -531,23 +567,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
|||
|
||||
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
|
||||
if (pObj != NULL) {
|
||||
bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
|
||||
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
|
||||
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
|
||||
if (roleChanged || restoreChanged) {
|
||||
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
|
||||
" to %" PRId64,
|
||||
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
|
||||
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
|
||||
pObj->syncState = statusReq.mload.syncState;
|
||||
pObj->syncRestore = statusReq.mload.syncRestore;
|
||||
pObj->syncTerm = statusReq.mload.syncTerm;
|
||||
if (statusReq.mload.roleTimeMs == 0) {
|
||||
statusReq.mload.roleTimeMs = statusReq.rebootTime;
|
||||
}
|
||||
|
||||
if (roleChanged) {
|
||||
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
|
||||
}
|
||||
|
||||
mndUpdateMnodeState(pObj, &statusReq.mload);
|
||||
mndReleaseMnode(pMnode, pObj);
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
|
|||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
bool roleChanged = false;
|
||||
bool stateChanged = false;
|
||||
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
|
||||
SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
|
||||
if (pGid->dnodeId == dnodeId) {
|
||||
|
@ -197,13 +197,14 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
|
|||
pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
|
||||
pGid->syncRestore = 0;
|
||||
pGid->syncCanRead = 0;
|
||||
roleChanged = true;
|
||||
pGid->startTimeMs = 0;
|
||||
stateChanged = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (roleChanged) {
|
||||
if (stateChanged) {
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||
mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
|
||||
|
|
|
@ -807,7 +807,6 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
ESdbStatus objStatus = 0;
|
||||
char *pWrite;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
int64_t dummyTimeMs = 0;
|
||||
|
||||
pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
|
||||
if (pSelfObj == NULL) {
|
||||
|
@ -858,16 +857,9 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
|
||||
|
||||
int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
|
||||
// state of old term / no status report => use dummyTimeMs
|
||||
if (pObj->syncTerm > pSelfObj->syncTerm) {
|
||||
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
|
||||
}
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
|
||||
} else {
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
|
||||
}
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pObj);
|
||||
|
|
|
@ -961,27 +961,24 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
int32_t numOfRows = 0;
|
||||
SVgObj *pVgroup = NULL;
|
||||
int32_t cols = 0;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
SVnodeGid *pGid = &pVgroup->vnodeGid[i];
|
||||
SColumnInfoData *pColInfo = NULL;
|
||||
cols = 0;
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);
|
||||
|
||||
char buf[20] = {0};
|
||||
STR_TO_VARSTR(buf, syncStr(pVgid->syncState));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
|
||||
|
||||
// db_name
|
||||
const char *dbname = mndGetDbStr(pVgroup->dbName);
|
||||
char b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
if (dbname != NULL) {
|
||||
|
@ -992,20 +989,33 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgid->dnodeId, false);
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
char b2[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
if (pDnode != NULL) {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
|
||||
} else {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, "NULL", TSDB_EP_LEN + VARSTR_HEADER_SIZE);
|
||||
// dnode is online?
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
|
||||
if (pDnode == NULL) {
|
||||
mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
|
||||
break;
|
||||
}
|
||||
bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);
|
||||
|
||||
char buf[20] = {0};
|
||||
ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
|
||||
STR_TO_VARSTR(buf, syncStr(syncState));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
|
||||
|
||||
int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);
|
||||
|
||||
int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
|
|
|
@ -168,7 +168,9 @@ int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
|||
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
||||
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
|
||||
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
|
||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
||||
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
|
||||
void metaCloseStbCursor(SMStbCursor* pStbCur);
|
||||
|
|
|
@ -423,40 +423,75 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
|
|||
|
||||
pCtbCur->pMeta = pMeta;
|
||||
pCtbCur->suid = uid;
|
||||
if (lock) {
|
||||
metaRLock(pMeta);
|
||||
}
|
||||
pCtbCur->lock = lock;
|
||||
pCtbCur->paused = 1;
|
||||
|
||||
ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
|
||||
ret = metaResumeCtbCursor(pCtbCur, 1);
|
||||
if (ret < 0) {
|
||||
metaULock(pMeta);
|
||||
taosMemoryFree(pCtbCur);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// move to the suid
|
||||
ctbIdxKey.suid = uid;
|
||||
ctbIdxKey.uid = INT64_MIN;
|
||||
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
|
||||
if (c > 0) {
|
||||
tdbTbcMoveToNext(pCtbCur->pCur);
|
||||
}
|
||||
|
||||
return pCtbCur;
|
||||
}
|
||||
|
||||
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
|
||||
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
|
||||
if (pCtbCur) {
|
||||
if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
|
||||
if (pCtbCur->pCur) {
|
||||
tdbTbcClose(pCtbCur->pCur);
|
||||
if (!pCtbCur->paused) {
|
||||
if (pCtbCur->pMeta && pCtbCur->lock) metaULock(pCtbCur->pMeta);
|
||||
if (pCtbCur->pCur) {
|
||||
tdbTbcClose(pCtbCur->pCur);
|
||||
}
|
||||
}
|
||||
tdbFree(pCtbCur->pKey);
|
||||
tdbFree(pCtbCur->pVal);
|
||||
}
|
||||
taosMemoryFree(pCtbCur);
|
||||
}
|
||||
|
||||
tdbFree(pCtbCur->pKey);
|
||||
tdbFree(pCtbCur->pVal);
|
||||
void metaPauseCtbCursor(SMCtbCursor* pCtbCur) {
|
||||
if (!pCtbCur->paused) {
|
||||
tdbTbcClose((TBC*)pCtbCur->pCur);
|
||||
if (pCtbCur->lock) {
|
||||
metaULock(pCtbCur->pMeta);
|
||||
}
|
||||
pCtbCur->paused = 1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first) {
|
||||
if (pCtbCur->paused) {
|
||||
pCtbCur->paused = 0;
|
||||
|
||||
if (pCtbCur->lock) {
|
||||
metaRLock(pCtbCur->pMeta);
|
||||
}
|
||||
int ret = 0;
|
||||
ret = tdbTbcOpen(pCtbCur->pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
|
||||
if (ret < 0) {
|
||||
metaCloseCtbCursor(pCtbCur);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosMemoryFree(pCtbCur);
|
||||
if (first) {
|
||||
SCtbIdxKey ctbIdxKey;
|
||||
// move to the suid
|
||||
ctbIdxKey.suid = pCtbCur->suid;
|
||||
ctbIdxKey.uid = INT64_MIN;
|
||||
int c = 0;
|
||||
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
|
||||
if (c > 0) {
|
||||
tdbTbcMoveToNext(pCtbCur->pCur);
|
||||
}
|
||||
} else {
|
||||
int c = 0;
|
||||
ret = tdbTbcMoveTo(pCtbCur->pCur, pCtbCur->pKey, pCtbCur->kLen, &c);
|
||||
if (c < 0) {
|
||||
tdbTbcMoveToPrev(pCtbCur->pCur);
|
||||
} else {
|
||||
tdbTbcMoveToNext(pCtbCur->pCur);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
|
||||
|
@ -1414,7 +1449,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
|
|||
}
|
||||
|
||||
taosHashCleanup(pSepecifiedUidMap);
|
||||
metaCloseCtbCursor(pCur, 1);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -98,6 +98,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
|||
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
|
||||
|
||||
pMeta->openCtbCursor = metaOpenCtbCursor;
|
||||
pMeta->resumeCtbCursor = metaResumeCtbCursor;
|
||||
pMeta->pauseCtbCursor = metaPauseCtbCursor;
|
||||
pMeta->closeCtbCursor = metaCloseCtbCursor;
|
||||
pMeta->ctbCursorNext = metaCtbCursorNext;
|
||||
}
|
||||
|
|
|
@ -380,6 +380,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
|||
pLoad->vgId = TD_VID(pVnode);
|
||||
pLoad->syncState = state.state;
|
||||
pLoad->syncRestore = state.restored;
|
||||
pLoad->syncTerm = state.term;
|
||||
pLoad->roleTimeMs = state.roleTimeMs;
|
||||
pLoad->startTimeMs = state.startTimeMs;
|
||||
pLoad->syncCanRead = state.canRead;
|
||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||
|
@ -452,7 +455,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
|
|||
taosArrayPush(list, &info);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur, 1);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -473,7 +476,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
|
|||
taosArrayPush(list, &id);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur, 1);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -536,7 +539,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
|||
++(*num);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur, 0);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ void* tableListDestroy(STableListInfo* pTableListInfo);
|
|||
void tableListClear(STableListInfo* pTableListInfo);
|
||||
int32_t tableListGetOutputGroups(const STableListInfo* pTableList);
|
||||
bool oneTableForEachGroup(const STableListInfo* pTableList);
|
||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
|
||||
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
|
||||
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
||||
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
|
||||
int32_t* num);
|
||||
|
|
|
@ -218,7 +218,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pRes->info.id.groupId = getTableGroupId(pTableList, pRes->info.id.uid);
|
||||
pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
|
||||
pInfo->indexOfBufferedRes += 1;
|
||||
return pRes;
|
||||
} else {
|
||||
|
|
|
@ -1951,7 +1951,7 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu
|
|||
*type = pTableList->idInfo.tableType;
|
||||
}
|
||||
|
||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
|
||||
ASSERT(pTableList->map != NULL && slot != NULL);
|
||||
|
||||
|
|
|
@ -693,7 +693,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pBlock->info.id.uid) {
|
||||
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
|
@ -1195,7 +1195,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
if (hasNext) {
|
||||
/*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
|
||||
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
}
|
||||
|
||||
pAPI->tsdReader.tsdReaderClose(pReader);
|
||||
|
@ -1217,7 +1217,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
|||
|
||||
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
|
||||
return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
|
||||
}
|
||||
|
||||
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||
|
@ -1758,7 +1758,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
pBlockInfo->version = pBlock->info.version;
|
||||
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
pBlockInfo->id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
|
||||
// todo extract method
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||
|
@ -2873,11 +2873,6 @@ static void tagScanFillOneCellWithTag(SOperatorInfo* pOperator, const STUidTagIn
|
|||
if (QUERY_NODE_FUNCTION == pExprInfo->pExpr->nodeType) {
|
||||
if (FUNCTION_TYPE_TBNAME == pExprInfo->pExpr->_function.functionType) { // tbname
|
||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
// if (pUidTagInfo->name != NULL) {
|
||||
// STR_TO_VARSTR(str, pUidTagInfo->name);
|
||||
// } else { // name is not retrieved during filter
|
||||
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
|
||||
// }
|
||||
STR_TO_VARSTR(str, "ctbidx");
|
||||
|
||||
colDataSetVal(pColInfo, rowIndex, str, false);
|
||||
|
@ -2950,13 +2945,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
blockDataCleanup(pRes);
|
||||
|
||||
if (pInfo->pCtbCursor == NULL) {
|
||||
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 0);
|
||||
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
||||
} else {
|
||||
pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0);
|
||||
}
|
||||
|
||||
SArray* aUidTags = pInfo->aUidTags;
|
||||
SArray* aFilterIdxs = pInfo->aFilterIdxs;
|
||||
int32_t count = 0;
|
||||
|
||||
bool ctbCursorFinished = false;
|
||||
while (1) {
|
||||
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
||||
taosArrayClear(aFilterIdxs);
|
||||
|
@ -2966,6 +2963,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
||||
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
|
||||
if (uid == 0) {
|
||||
ctbCursorFinished = true;
|
||||
break;
|
||||
}
|
||||
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
|
||||
|
@ -2994,7 +2992,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (count > 0) {
|
||||
pAPI->metaFn.pauseCtbCursor(pInfo->pCtbCursor);
|
||||
}
|
||||
if (count == 0 || ctbCursorFinished) {
|
||||
pAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
|
||||
pInfo->pCtbCursor = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
pOperator->resultInfo.totalRows += count;
|
||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
|
@ -3059,7 +3065,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
|
|||
static void destroyTagScanOperatorInfo(void* param) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||
if (pInfo->pCtbCursor != NULL) {
|
||||
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 0);
|
||||
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
|
||||
}
|
||||
taosHashCleanup(pInfo->filterCtx.colHash);
|
||||
taosArrayDestroy(pInfo->filterCtx.cInfoList);
|
||||
|
@ -3189,7 +3195,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
|||
continue;
|
||||
}
|
||||
|
||||
pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
|
|
@ -144,6 +144,7 @@ SNode* addRangeClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pRange);
|
|||
SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery);
|
||||
SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill);
|
||||
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint);
|
||||
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags);
|
||||
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight);
|
||||
|
||||
SDataType createDataType(uint8_t type);
|
||||
|
|
|
@ -475,8 +475,8 @@ cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B).
|
|||
cmd ::= SHOW TAGS FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, createIdentifierValueNode(pCxt, &B), createIdentifierValueNode(pCxt, &A), OP_TYPE_EQUAL); }
|
||||
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); }
|
||||
cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM db_name(B) NK_DOT table_name(A). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, createIdentifierValueNode(pCxt, &A), createIdentifierValueNode(pCxt, &B), C); }
|
||||
cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
|
||||
cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); }
|
||||
cmd ::= SHOW VNODES ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
|
||||
cmd ::= SHOW VNODES. { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, NULL); }
|
||||
// show alive
|
||||
cmd ::= SHOW db_name_cond_opt(A) ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, A, QUERY_NODE_SHOW_DB_ALIVE_STMT); }
|
||||
cmd ::= SHOW CLUSTER ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, NULL, QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT); }
|
||||
|
@ -1009,10 +1009,11 @@ join_type(A) ::= INNER.
|
|||
|
||||
/************************************************ query_specification *************************************************/
|
||||
query_specification(A) ::=
|
||||
SELECT hint_list(M) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
|
||||
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
|
||||
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
|
||||
SELECT hint_list(M) tag_mode_opt(N) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
|
||||
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
|
||||
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
|
||||
A = createSelectStmt(pCxt, B, C, D, M);
|
||||
A = setSelectStmtTagMode(pCxt, A, N);
|
||||
A = addWhereClause(pCxt, A, E);
|
||||
A = addPartitionByClause(pCxt, A, F);
|
||||
A = addWindowClauseClause(pCxt, A, G);
|
||||
|
@ -1028,6 +1029,11 @@ query_specification(A) ::=
|
|||
hint_list(A) ::= . { A = createHintNodeList(pCxt, NULL); }
|
||||
hint_list(A) ::= NK_HINT(B). { A = createHintNodeList(pCxt, &B); }
|
||||
|
||||
%type tag_mode_opt { bool }
|
||||
%destructor tag_mode_opt { }
|
||||
tag_mode_opt(A) ::= . { A = false; }
|
||||
tag_mode_opt(A) ::= TAGS. { A = true; }
|
||||
|
||||
%type set_quantifier_opt { bool }
|
||||
%destructor set_quantifier_opt { }
|
||||
set_quantifier_opt(A) ::= . { A = false; }
|
||||
|
|
|
@ -969,6 +969,13 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
|
|||
return select;
|
||||
}
|
||||
|
||||
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags) {
|
||||
if (pStmt && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||
((SSelectStmt*)pStmt)->tagScan = bSelectTags;
|
||||
}
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
static void setSubquery(SNode* pStmt) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||
((SSelectStmt*)pStmt)->isSubquery = true;
|
||||
|
|
|
@ -8078,8 +8078,6 @@ static int32_t rewriteShowVnodes(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL != pShow->pDnodeId) {
|
||||
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", pShow->pDnodeId, &pStmt->pWhere);
|
||||
} else {
|
||||
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_ep", pShow->pDnodeEndpoint, &pStmt->pWhere);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -239,9 +239,9 @@ TEST_F(ParserShowToUseTest, showVgroups) {
|
|||
TEST_F(ParserShowToUseTest, showVnodes) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SHOW VNODES 1");
|
||||
run("SHOW VNODES ON DNODE 1");
|
||||
|
||||
run("SHOW VNODES 'node1:7030'");
|
||||
run("SHOW VNODES");
|
||||
}
|
||||
|
||||
TEST_F(ParserShowToUseTest, splitVgroup) {
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "planInt.h"
|
||||
|
||||
#include "filter.h"
|
||||
#include "functionMgt.h"
|
||||
|
||||
typedef struct SLogicPlanContext {
|
||||
|
@ -256,7 +256,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
|||
return SCAN_TYPE_SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
if (tagScan) {
|
||||
if (tagScan && 0 == LIST_LENGTH(pScanCols) && 0 != LIST_LENGTH(pScanPseudoCols)) {
|
||||
return SCAN_TYPE_TAG;
|
||||
}
|
||||
|
||||
|
@ -347,6 +347,55 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
|
|||
|
||||
static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; }
|
||||
|
||||
static EDealRes tagScanNodeHasTbnameFunc(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pNode)->funcType ||
|
||||
(QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType)) {
|
||||
*(bool*)pContext = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool tagScanNodeListHasTbname(SNodeList* pCols) {
|
||||
bool hasTbname = false;
|
||||
nodesWalkExprs(pCols, tagScanNodeHasTbnameFunc, &hasTbname);
|
||||
return hasTbname;
|
||||
}
|
||||
|
||||
static bool tagScanNodeHasTbname(SNode* pKeys) {
|
||||
bool hasTbname = false;
|
||||
nodesWalkExpr(pKeys, tagScanNodeHasTbnameFunc, &hasTbname);
|
||||
return hasTbname;
|
||||
}
|
||||
|
||||
static int32_t tagScanSetExecutionMode(SScanLogicNode* pScan) {
|
||||
pScan->onlyMetaCtbIdx = false;
|
||||
|
||||
if (tagScanNodeListHasTbname(pScan->pScanPseudoCols)) {
|
||||
pScan->onlyMetaCtbIdx = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pScan->node.pConditions == NULL) {
|
||||
pScan->onlyMetaCtbIdx = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pCond = nodesCloneNode(pScan->node.pConditions);
|
||||
SNode* pTagCond = NULL;
|
||||
SNode* pTagIndexCond = NULL;
|
||||
filterPartitionCond(&pCond, NULL, &pTagIndexCond, &pTagCond, NULL);
|
||||
if (pTagIndexCond || tagScanNodeHasTbname(pTagCond)) {
|
||||
pScan->onlyMetaCtbIdx = false;
|
||||
} else {
|
||||
pScan->onlyMetaCtbIdx = true;
|
||||
}
|
||||
nodesDestroyNode(pCond);
|
||||
nodesDestroyNode(pTagIndexCond);
|
||||
nodesDestroyNode(pTagCond);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable,
|
||||
SLogicNode** pLogicNode) {
|
||||
SScanLogicNode* pScan = NULL;
|
||||
|
@ -422,6 +471,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
|
||||
}
|
||||
|
||||
if (pScan->scanType == SCAN_TYPE_TAG) {
|
||||
code = tagScanSetExecutionMode(pScan);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pScan;
|
||||
} else {
|
||||
|
|
|
@ -1689,7 +1689,8 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
|
|||
|
||||
static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
|
||||
bool ret = 1 == LIST_LENGTH(pNode->pChildren) &&
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0));
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)) &&
|
||||
SCAN_TYPE_TAG != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->scanType;
|
||||
if (!ret) return ret;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION: {
|
||||
|
|
|
@ -509,6 +509,7 @@ SSyncState syncGetState(int64_t rid) {
|
|||
if (pSyncNode != NULL) {
|
||||
state.state = pSyncNode->state;
|
||||
state.roleTimeMs = pSyncNode->roleTimeMs;
|
||||
state.startTimeMs = pSyncNode->startTime;
|
||||
state.restored = pSyncNode->restoreFinish;
|
||||
if (pSyncNode->vgId != 1) {
|
||||
state.canRead = syncNodeIsReadyForRead(pSyncNode);
|
||||
|
|
|
@ -6,6 +6,10 @@
|
|||
,,y,unit-test,bash test.sh
|
||||
|
||||
#system test
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
|
||||
|
|
|
@ -107,35 +107,39 @@ if $data30 != 12 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =============== show vnodes
|
||||
sql show vnodes 1
|
||||
print =============== show vnodes on dnode 1
|
||||
sql show vnodes on dnode 1
|
||||
if $rows != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[1] != 1 then
|
||||
return -1
|
||||
if $data10 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[2] != leader then
|
||||
return -1
|
||||
if $data11 != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[3] != d2 then
|
||||
return -1
|
||||
if $data12 != d2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[4] != 1 then
|
||||
return -1
|
||||
if $data13 != leader then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data(4)[5] != localhost:7100 then
|
||||
return -1
|
||||
print $data14
|
||||
print $data15
|
||||
|
||||
if $data16 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show vnodes 'localhost:7100'
|
||||
print ================ show vnodes
|
||||
sql show vnodes
|
||||
if $rows != 9 then
|
||||
return -1
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== drop database
|
||||
|
|
|
@ -45,4 +45,86 @@ if $rows != 4 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create table stt1(ts timestamp, f int) tags (t int, b varchar(10));
|
||||
sql insert into ctt11 using stt1 tags(1, '1aa') values(now, 1);
|
||||
sql insert into ctt12 using stt1 tags(2, '1bb') values(now, 2);
|
||||
sql insert into ctt13 using stt1 tags(3, '1cc') values(now, 3);
|
||||
sql insert into ctt14 using stt1 tags(4, '1dd') values(now, 4);
|
||||
sql insert into ctt14 values(now, 5);
|
||||
|
||||
sql create table stt2(ts timestamp, f int) tags (t int, b varchar(10));
|
||||
sql insert into ctt21 using stt2 tags(1, '2aa') values(now, 1);
|
||||
sql insert into ctt22 using stt2 tags(2, '2bb') values(now, 2);
|
||||
sql insert into ctt23 using stt2 tags(3, '2cc') values(now, 3);
|
||||
sql insert into ctt24 using stt2 tags(4, '2dd') values(now, 4);
|
||||
|
||||
sql select tags t, b from stt1 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data10 $data11 $data20 $data21 $data30 $data31
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @1dd@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t, b from stt2 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data10 $data11 $data20 $data21 $data30 $data31
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @2dd@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b,f from stt1 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32 $data40 $data41 $data42
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data42 != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags tbname,t,b from stt1 order by t
|
||||
print $rows
|
||||
print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @ctt14@ then
|
||||
return -1
|
||||
endi
|
||||
if $data32 != @1dd@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b from stt1 where t=1
|
||||
print $rows
|
||||
print $data00 $data01
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @1@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @1aa@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select tags t,b from stt1 where tbname='ctt11'
|
||||
print $rows
|
||||
print $data00 $data01
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @1@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @1aa@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -128,7 +128,7 @@ class TDTestCase:
|
|||
continue
|
||||
else:
|
||||
tdLog.exit(f"show create database check failed with {key} {value}")
|
||||
tdSql.query('show vnodes 1')
|
||||
tdSql.query('show vnodes on dnode 1')
|
||||
tdSql.checkRows(self.vgroups)
|
||||
tdSql.execute(f'use {self.dbname}')
|
||||
|
||||
|
|
Loading…
Reference in New Issue