Merge pull request #9975 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
d44039e430
|
@ -567,58 +567,58 @@ TEST(testCase, show_table_Test) {
|
|||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, projection_query_tables) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// ASSERT_NE(pConn, nullptr);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
//// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
||||
//// if (taos_errno(pRes) != 0) {
|
||||
//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
//// }
|
||||
//// taos_free_result(pRes);
|
||||
////
|
||||
//// pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
||||
//// if (taos_errno(pRes) != 0) {
|
||||
//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
//// }
|
||||
//// taos_free_result(pRes);
|
||||
////
|
||||
//// for(int32_t i = 0; i < 100; ++i) {
|
||||
//// char sql[512] = {0};
|
||||
//// sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
|
||||
//// TAOS_RES* p = taos_query(pConn, sql);
|
||||
//// if (taos_errno(p) != 0) {
|
||||
//// printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||
//// }
|
||||
////
|
||||
//// taos_free_result(p);
|
||||
//// }
|
||||
//
|
||||
// pRes = taos_query(pConn, "select * from tu");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||
// taos_free_result(pRes);
|
||||
// ASSERT_TRUE(false);
|
||||
// }
|
||||
//
|
||||
// TAOS_ROW pRow = NULL;
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
|
||||
TEST(testCase, projection_query_tables) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
for(int32_t i = 0; i < 100000; ++i) {
|
||||
char sql[512] = {0};
|
||||
sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
|
||||
TAOS_RES* p = taos_query(pConn, sql);
|
||||
if (taos_errno(p) != 0) {
|
||||
printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||
}
|
||||
|
||||
taos_free_result(p);
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "select * from tu");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
||||
TAOS_ROW pRow = NULL;
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, projection_query_stables) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// ASSERT_NE(pConn, nullptr);
|
||||
|
|
|
@ -156,6 +156,11 @@ _exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void freeItemHelper(void* pItem) {
|
||||
char* p = *(char**)pItem;
|
||||
free(p);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param pVnode
|
||||
* @param pMsg
|
||||
|
@ -169,17 +174,20 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int32_t totalLen = 0;
|
||||
int32_t numOfTables = 0;
|
||||
while ((name = metaTbCursorNext(pCur)) != NULL) {
|
||||
if (numOfTables < 1000) { // TODO: temp get tables of vnode, and should del when show tables commad ok.
|
||||
if (numOfTables < 10000) { // TODO: temp get tables of vnode, and should del when show tables commad ok.
|
||||
taosArrayPush(pArray, &name);
|
||||
totalLen += strlen(name);
|
||||
} else {
|
||||
tfree(name);
|
||||
}
|
||||
|
||||
numOfTables++;
|
||||
}
|
||||
|
||||
// TODO: temp debug, and should del when show tables command ok
|
||||
vError("====vgId:%d, numOfTables: %d", pVnode->vgId, numOfTables);
|
||||
if (numOfTables > 1000) {
|
||||
numOfTables = 1000;
|
||||
vInfo("====vgId:%d, numOfTables: %d", pVnode->vgId, numOfTables);
|
||||
if (numOfTables > 10000) {
|
||||
numOfTables = 10000;
|
||||
}
|
||||
|
||||
metaCloseTbCursor(pCur);
|
||||
|
@ -214,6 +222,6 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
};
|
||||
|
||||
rpcSendResponse(&rpcMsg);
|
||||
taosArrayDestroy(pArray);
|
||||
taosArrayDestroyEx(pArray, freeItemHelper);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -372,11 +372,14 @@ typedef struct STaskParam {
|
|||
|
||||
typedef struct SExchangeInfo {
|
||||
SArray *pSources;
|
||||
uint64_t bytes; // total load bytes from remote
|
||||
tsem_t ready;
|
||||
void *pTransporter;
|
||||
SRetrieveTableRsp *pRsp;
|
||||
SSDataBlock *pResult;
|
||||
int32_t current;
|
||||
uint64_t rowsOfCurrentSource;
|
||||
uint64_t bytes; // total load bytes from remote
|
||||
uint64_t totalRows;
|
||||
} SExchangeInfo;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
|
|
|
@ -164,13 +164,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
|
||||
// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
||||
// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo));
|
||||
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
// return doBuildResCheck(pTaskInfo);
|
||||
// }
|
||||
|
||||
// error occurs, record the error code and return to client
|
||||
int32_t ret = setjmp(pTaskInfo->env);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -4958,6 +4958,10 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
|||
STableScanInfo *pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (pTableScanInfo->pTsdbReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
|
||||
*newgroup = false;
|
||||
|
||||
|
@ -5144,72 +5148,111 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
*newgroup = false;
|
||||
if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) {
|
||||
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
||||
if (NULL == pMsg) { // todo handle malloc error
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
SResFetchReq* pMsg = NULL;
|
||||
SMsgSendInfo* pMsgSendInfo = NULL;
|
||||
|
||||
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0);
|
||||
SEpSet epSet = {0};
|
||||
|
||||
epSet.numOfEps = pSource->addr.numOfEps;
|
||||
epSet.port[0] = pSource->addr.epAddr[0].port;
|
||||
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
|
||||
|
||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||
pMsg->sId = htobe64(pSource->schedId);
|
||||
pMsg->taskId = htobe64(pSource->taskId);
|
||||
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
|
||||
|
||||
// send the fetch remote task result reques
|
||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pMsgSendInfo->param = pExchangeInfo;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||
pMsgSendInfo->msgType = TDMT_VND_FETCH;
|
||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
if (pExchangeInfo->pRsp->numOfRows == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||
char* pData = pExchangeInfo->pRsp->data;
|
||||
|
||||
for(int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows);
|
||||
if (tmp == NULL) {
|
||||
while(1) {
|
||||
pMsg = calloc(1, sizeof(SResFetchReq));
|
||||
if (NULL == pMsg) { // todo handle malloc error
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes;
|
||||
memcpy(tmp, pData, len);
|
||||
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
|
||||
pColInfoData->pData = tmp;
|
||||
pData += len;
|
||||
SEpSet epSet = {0};
|
||||
epSet.numOfEps = pSource->addr.numOfEps;
|
||||
epSet.port[0] = pSource->addr.epAddr[0].port;
|
||||
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources);
|
||||
|
||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||
pMsg->sId = htobe64(pSource->schedId);
|
||||
pMsg->taskId = htobe64(pSource->taskId);
|
||||
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
|
||||
|
||||
// send the fetch remote task result reques
|
||||
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
qError("QID:0x%" PRIx64 " prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pMsgSendInfo->param = pExchangeInfo;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||
pMsgSendInfo->msgType = TDMT_VND_FETCH;
|
||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp;
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||
pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows);
|
||||
|
||||
pExchangeInfo->rowsOfCurrentSource = 0;
|
||||
pExchangeInfo->current += 1;
|
||||
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
return NULL;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||
char* pData = pRsp->data;
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows);
|
||||
if (tmp == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
size_t len = pRsp->numOfRows * pColInfoData->info.bytes;
|
||||
memcpy(tmp, pData, len);
|
||||
|
||||
pColInfoData->pData = tmp;
|
||||
pData += len;
|
||||
}
|
||||
|
||||
pRes->info.numOfCols = pOperator->numOfOutput;
|
||||
pRes->info.rows = pRsp->numOfRows;
|
||||
|
||||
pExchangeInfo->totalRows += pRsp->numOfRows;
|
||||
pExchangeInfo->bytes += pRsp->compLen;
|
||||
pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows;
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes,
|
||||
pExchangeInfo->current + 1, totalSources);
|
||||
|
||||
pExchangeInfo->rowsOfCurrentSource = 0;
|
||||
pExchangeInfo->current += 1;
|
||||
} else {
|
||||
qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes);
|
||||
}
|
||||
|
||||
return pExchangeInfo->pResult;
|
||||
}
|
||||
|
||||
pRes->info.numOfCols = pOperator->numOfOutput;
|
||||
pRes->info.rows = pExchangeInfo->pRsp->numOfRows;
|
||||
|
||||
return pExchangeInfo->pResult;
|
||||
|
||||
_error:
|
||||
tfree(pMsg);
|
||||
tfree(pMsgSendInfo);
|
||||
|
@ -7719,7 +7762,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
|
|||
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
|
||||
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
|
||||
} else if (pPhyNode->info.type == OP_StreamScan) {
|
||||
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
|
||||
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
|
||||
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo);
|
||||
}
|
||||
|
@ -7786,11 +7828,12 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
|
|||
|
||||
if (groupInfo.numOfTables == 0) {
|
||||
code = 0;
|
||||
// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId);
|
||||
qDebug("no table qualified for query, reqId:0x%"PRIx64, queryId);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return createDataReadHandle(pTableScanNode, &groupInfo, readerHandle, queryId);
|
||||
|
||||
_error:
|
||||
terrno = code;
|
||||
return NULL;
|
||||
|
|
|
@ -26,7 +26,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
|
|||
const char* msg4 = "pattern is invalid";
|
||||
const char* msg5 = "database name is empty";
|
||||
const char* msg6 = "pattern string is empty";
|
||||
const char* msg7 = "db is not specified";
|
||||
const char* msg7 = "database not specified";
|
||||
/*
|
||||
* database prefix in pInfo->pMiscInfo->a[0]
|
||||
* wildcard in like clause in pInfo->pMiscInfo->a[1]
|
||||
|
@ -50,7 +50,11 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
|
|||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
|
||||
catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array);
|
||||
int32_t code = catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
SVgroupInfo* info = taosArrayGet(array, 0);
|
||||
pShowReq->head.vgId = htonl(info->vgId);
|
||||
|
|
Loading…
Reference in New Issue