Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/two_versions
This commit is contained in:
commit
b40e3671a4
|
@ -401,13 +401,12 @@ typedef struct SStreamBlockScanInfo {
|
||||||
} SStreamBlockScanInfo;
|
} SStreamBlockScanInfo;
|
||||||
|
|
||||||
typedef struct SSysTableScanInfo {
|
typedef struct SSysTableScanInfo {
|
||||||
SReadHandle readHandle;
|
|
||||||
|
|
||||||
SRetrieveMetaTableRsp* pRsp;
|
SRetrieveMetaTableRsp* pRsp;
|
||||||
SRetrieveTableReq req;
|
SRetrieveTableReq req;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
tsem_t ready;
|
tsem_t ready;
|
||||||
|
|
||||||
|
SReadHandle readHandle;
|
||||||
int32_t accountId;
|
int32_t accountId;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||||
|
|
|
@ -306,17 +306,21 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
|
||||||
const char* p = NULL;
|
const char* p = NULL;
|
||||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
const uint8_t* tmp = mr.me.ctbEntry.pTags;
|
const uint8_t* tmp = mr.me.ctbEntry.pTags;
|
||||||
|
|
||||||
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
|
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
|
metaReaderClear(&mr);
|
||||||
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
|
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
*data = TSDB_DATA_TYPE_JSON;
|
*data = TSDB_DATA_TYPE_JSON;
|
||||||
memcpy(data + 1, tmp, kvRowLen(tmp));
|
memcpy(data + 1, tmp, kvRowLen(tmp));
|
||||||
p = data;
|
p = data;
|
||||||
} else {
|
} else {
|
||||||
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
colDataAppend(pColInfoData, i, p, (p == NULL));
|
colDataAppend(pColInfoData, i, p, (p == NULL));
|
||||||
}
|
}
|
||||||
|
@ -1216,18 +1220,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
// retrieve local table list info from vnode
|
// retrieve local table list info from vnode
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||||
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
|
|
||||||
if (pInfo->readHandle.mnd != NULL) {
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
|
||||||
|
if (pInfo->readHandle.mnd != NULL) {
|
||||||
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
|
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
doFilterResult(pInfo);
|
doFilterResult(pInfo);
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
doSetOperatorCompleted(pOperator);
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->pCur == NULL) {
|
if (pInfo->pCur == NULL) {
|
||||||
|
@ -1253,7 +1257,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
while (metaTbCursorNext(pInfo->pCur) == 0) {
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
|
||||||
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
|
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
|
||||||
|
|
||||||
// table name
|
// table name
|
||||||
|
@ -1336,6 +1342,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||||
|
if (ret != 0) {
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
p->info.rows = numOfRows;
|
p->info.rows = numOfRows;
|
||||||
pInfo->pRes->info.rows = numOfRows;
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue