Merge remote-tracking branch 'origin/3.0' into feature/warning
This commit is contained in:
commit
fab317226f
|
@ -1529,17 +1529,22 @@ typedef struct SMqSetCVgReq {
|
||||||
} SMqSetCVgReq;
|
} SMqSetCVgReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
|
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
|
||||||
int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen;
|
int32_t tlen = 0;
|
||||||
if (buf == NULL) return tlen;
|
tlen += taosEncodeFixedU64(buf, pMsg->sId);
|
||||||
memcpy(*buf, pMsg, tlen);
|
tlen += taosEncodeFixedU64(buf, pMsg->queryId);
|
||||||
*buf = POINTER_SHIFT(*buf, tlen);
|
tlen += taosEncodeFixedU64(buf, pMsg->taskId);
|
||||||
|
tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
|
||||||
|
tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
|
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
|
||||||
int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen;
|
buf = taosDecodeFixedU64(buf, &pMsg->sId);
|
||||||
memcpy(pMsg, buf, tlen);
|
buf = taosDecodeFixedU64(buf, &pMsg->queryId);
|
||||||
return POINTER_SHIFT(buf, tlen);
|
buf = taosDecodeFixedU64(buf, &pMsg->taskId);
|
||||||
|
buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
|
||||||
|
buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
|
||||||
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
||||||
|
|
|
@ -29,21 +29,23 @@ struct SSubplan;
|
||||||
/**
|
/**
|
||||||
* Create the exec task for streaming mode
|
* Create the exec task for streaming mode
|
||||||
* @param pMsg
|
* @param pMsg
|
||||||
* @param pStreamBlockReadHandle
|
* @param streamReadHandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
|
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle);
|
||||||
|
|
||||||
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the exec task object according to task json
|
* Create the exec task object according to task json
|
||||||
* @param tsdb
|
* @param readHandle
|
||||||
* @param vgId
|
* @param vgId
|
||||||
* @param pTaskInfoMsg
|
* @param pTaskInfoMsg
|
||||||
* @param pTaskInfo
|
* @param pTaskInfo
|
||||||
* @param qId
|
* @param qId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
|
int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main task execution function, including query on both table and multiple tables,
|
* The main task execution function, including query on both table and multiple tables,
|
||||||
|
@ -60,63 +62,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
|
||||||
* this function will be blocked to wait for the query execution completed or paused,
|
* this function will be blocked to wait for the query execution completed or paused,
|
||||||
* in which case enough results have been produced already.
|
* in which case enough results have been produced already.
|
||||||
*
|
*
|
||||||
* @param qinfo
|
* @param tinfo
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext);
|
int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* Retrieve the actual results to fill the response message payload.
|
* Retrieve the actual results to fill the response message payload.
|
||||||
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
|
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
|
||||||
*
|
*
|
||||||
* @param qinfo qinfo object
|
* @param tinfo tinfo object
|
||||||
* @param pRsp response message
|
* @param pRsp response message
|
||||||
* @param contLen payload length
|
* @param contLen payload length
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
|
//int32_t qDumpRetrieveResult(qTaskInfo_t tinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return the transporter context (RPC)
|
* return the transporter context (RPC)
|
||||||
* @param qinfo
|
* @param tinfo
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
|
void* qGetResultRetrieveMsg(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* kill the ongoing query and free the query handle and corresponding resources automatically
|
* kill the ongoing query and free the query handle and corresponding resources automatically
|
||||||
* @param qinfo qhandle
|
* @param tinfo qhandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qKillTask(qTaskInfo_t qinfo);
|
int32_t qKillTask(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* kill the ongoing query asynchronously
|
* kill the ongoing query asynchronously
|
||||||
* @param qinfo qhandle
|
* @param tinfo qhandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo);
|
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return whether query is completed or not
|
* return whether query is completed or not
|
||||||
* @param qinfo
|
* @param tinfo
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qIsTaskCompleted(qTaskInfo_t qinfo);
|
int32_t qIsTaskCompleted(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* destroy query info structure
|
* destroy query info structure
|
||||||
* @param qHandle
|
* @param qHandle
|
||||||
*/
|
*/
|
||||||
void qDestroyTask(qTaskInfo_t qHandle);
|
void qDestroyTask(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the queried table uid
|
* Get the queried table uid
|
||||||
* @param qHandle
|
* @param qHandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int64_t qGetQueriedTableUid(qTaskInfo_t qHandle);
|
int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
|
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
|
||||||
|
@ -143,7 +145,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
|
||||||
* @param type operation type: ADD|DROP
|
* @param type operation type: ADD|DROP
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type);
|
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
|
||||||
|
|
||||||
//================================================================================================
|
//================================================================================================
|
||||||
// query handle management
|
// query handle management
|
||||||
|
|
|
@ -372,7 +372,7 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- binary
|
// ---- binary
|
||||||
static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valueLen) {
|
static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t valueLen) {
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
|
|
||||||
if (buf != NULL) {
|
if (buf != NULL) {
|
||||||
|
@ -384,14 +384,19 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valu
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int valueLen) {
|
static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valueLen) {
|
||||||
uint64_t size = 0;
|
|
||||||
|
|
||||||
*value = malloc((size_t)valueLen);
|
*value = malloc((size_t)valueLen);
|
||||||
if (*value == NULL) return NULL;
|
if (*value == NULL) return NULL;
|
||||||
memcpy(*value, buf, (size_t)size);
|
memcpy(*value, buf, (size_t)valueLen);
|
||||||
|
|
||||||
return POINTER_SHIFT(buf, size);
|
return POINTER_SHIFT(buf, valueLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void *taosDecodeBinaryTo(void *buf, void *value, int32_t valueLen) {
|
||||||
|
|
||||||
|
memcpy(value, buf, (size_t)valueLen);
|
||||||
|
return POINTER_SHIFT(buf, valueLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -196,6 +196,10 @@ static void doDestroyRequest(void* p) {
|
||||||
doFreeReqResultInfo(&pRequest->body.resInfo);
|
doFreeReqResultInfo(&pRequest->body.resInfo);
|
||||||
qDestroyQueryDag(pRequest->body.pDag);
|
qDestroyQueryDag(pRequest->body.pDag);
|
||||||
|
|
||||||
|
if (pRequest->body.showInfo.pArray != NULL) {
|
||||||
|
taosArrayDestroy(pRequest->body.showInfo.pArray);
|
||||||
|
}
|
||||||
|
|
||||||
deregisterRequest(pRequest);
|
deregisterRequest(pRequest);
|
||||||
tfree(pRequest);
|
tfree(pRequest);
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,19 +145,23 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pSchema = pMetaMsg->pSchema;
|
pSchema = pMetaMsg->pSchema;
|
||||||
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
|
tfree(pRequest->body.resInfo.pRspMsg);
|
||||||
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
|
||||||
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
|
|
||||||
pFields[i].type = pSchema[i].type;
|
|
||||||
pFields[i].bytes = pSchema[i].bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
pResInfo->fields = pFields;
|
if (pResInfo->fields == NULL) {
|
||||||
pResInfo->numOfCols = pMetaMsg->numOfColumns;
|
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
|
||||||
|
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
||||||
|
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
|
||||||
|
pFields[i].type = pSchema[i].type;
|
||||||
|
pFields[i].bytes = pSchema[i].bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResInfo->fields = pFields;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResInfo->numOfCols = pMetaMsg->numOfColumns;
|
||||||
pRequest->body.showInfo.execId = pShow->showId;
|
pRequest->body.showInfo.execId = pShow->showId;
|
||||||
|
|
||||||
// todo
|
// todo
|
||||||
|
|
|
@ -452,39 +452,39 @@ TEST(testCase, driverInit_Test) {
|
||||||
//
|
//
|
||||||
// taos_close(pConn);
|
// taos_close(pConn);
|
||||||
//}
|
//}
|
||||||
//
|
|
||||||
//TEST(testCase, show_table_Test) {
|
TEST(testCase, show_table_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "show tables");
|
TAOS_RES* pRes = taos_query(pConn, "show tables");
|
||||||
// if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
// printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// pRes = taos_query(pConn, "show abc1.tables");
|
pRes = taos_query(pConn, "show abc1.tables");
|
||||||
// if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
// printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// TAOS_ROW pRow = NULL;
|
TAOS_ROW pRow = NULL;
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
//
|
|
||||||
// int32_t count = 0;
|
int32_t count = 0;
|
||||||
// char str[512] = {0};
|
char str[512] = {0};
|
||||||
//
|
|
||||||
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||||
// printf("%d: %s\n", ++count, str);
|
printf("%d: %s\n", ++count, str);
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
// taos_close(pConn);
|
taos_close(pConn);
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//TEST(testCase, drop_stable_Test) {
|
//TEST(testCase, drop_stable_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
// assert(pConn != NULL);
|
||||||
|
@ -526,29 +526,29 @@ TEST(testCase, driverInit_Test) {
|
||||||
// taosHashCleanup(phash);
|
// taosHashCleanup(phash);
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
TEST(testCase, create_topic_Test) {
|
//TEST(testCase, create_topic_Test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
assert(pConn != NULL);
|
// assert(pConn != NULL);
|
||||||
|
//
|
||||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
// }
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
ASSERT_TRUE(pFields == nullptr);
|
// ASSERT_TRUE(pFields == nullptr);
|
||||||
|
//
|
||||||
int32_t numOfFields = taos_num_fields(pRes);
|
// int32_t numOfFields = taos_num_fields(pRes);
|
||||||
ASSERT_EQ(numOfFields, 0);
|
// ASSERT_EQ(numOfFields, 0);
|
||||||
|
//
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
char* sql = "select * from tu";
|
// char* sql = "select * from tu";
|
||||||
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
// taos_close(pConn);
|
||||||
}
|
//}
|
||||||
|
|
||||||
//TEST(testCase, insert_test) {
|
//TEST(testCase, insert_test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
|
|
@ -127,7 +127,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
|
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||||
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
|
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
|
||||||
|
|
|
@ -17,11 +17,12 @@
|
||||||
#define _TD_TQ_H_
|
#define _TD_TQ_H_
|
||||||
|
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
|
#include "executor.h"
|
||||||
|
#include "vnode.h"
|
||||||
#include "mallocator.h"
|
#include "mallocator.h"
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "scheduler.h"
|
#include "scheduler.h"
|
||||||
#include "executor.h"
|
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
@ -82,27 +83,12 @@ typedef struct STqSubscribeReq {
|
||||||
int64_t topic[];
|
int64_t topic[];
|
||||||
} STqSubscribeReq;
|
} STqSubscribeReq;
|
||||||
|
|
||||||
typedef struct STqSubscribeRsp {
|
|
||||||
STqMsgHead head;
|
|
||||||
int64_t vgId;
|
|
||||||
char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
|
|
||||||
} STqSubscribeRsp;
|
|
||||||
|
|
||||||
typedef struct STqHeartbeatReq {
|
typedef struct STqHeartbeatReq {
|
||||||
} STqHeartbeatReq;
|
} STqHeartbeatReq;
|
||||||
|
|
||||||
typedef struct STqHeartbeatRsp {
|
typedef struct STqHeartbeatRsp {
|
||||||
} STqHeartbeatRsp;
|
} STqHeartbeatRsp;
|
||||||
|
|
||||||
typedef struct STqTopicVhandle {
|
|
||||||
int64_t topicId;
|
|
||||||
// executor for filter
|
|
||||||
void* filterExec;
|
|
||||||
// callback for mnode
|
|
||||||
// trigger when vnode list associated topic change
|
|
||||||
void* (*mCallback)(void*, void*);
|
|
||||||
} STqTopicVhandle;
|
|
||||||
|
|
||||||
#define TQ_BUFFER_SIZE 8
|
#define TQ_BUFFER_SIZE 8
|
||||||
|
|
||||||
typedef struct STqExec {
|
typedef struct STqExec {
|
||||||
|
@ -163,10 +149,10 @@ typedef struct STqGroup {
|
||||||
} STqGroup;
|
} STqGroup;
|
||||||
|
|
||||||
typedef struct STqTaskItem {
|
typedef struct STqTaskItem {
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
void* dst;
|
void* dst;
|
||||||
qTaskInfo_t task;
|
qTaskInfo_t task;
|
||||||
} STqTaskItem;
|
} STqTaskItem;
|
||||||
|
|
||||||
// new version
|
// new version
|
||||||
|
@ -199,10 +185,6 @@ typedef struct STqQueryMsg {
|
||||||
struct STqQueryMsg* next;
|
struct STqQueryMsg* next;
|
||||||
} STqQueryMsg;
|
} STqQueryMsg;
|
||||||
|
|
||||||
typedef struct STqCfg {
|
|
||||||
// TODO
|
|
||||||
} STqCfg;
|
|
||||||
|
|
||||||
typedef struct STqMemRef {
|
typedef struct STqMemRef {
|
||||||
SMemAllocatorFactory* pAllocatorFactory;
|
SMemAllocatorFactory* pAllocatorFactory;
|
||||||
SMemAllocator* pAllocator;
|
SMemAllocator* pAllocator;
|
||||||
|
@ -299,6 +281,7 @@ typedef struct STQ {
|
||||||
STqMemRef tqMemRef;
|
STqMemRef tqMemRef;
|
||||||
STqMetaStore* tqMeta;
|
STqMetaStore* tqMeta;
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
|
SMeta* pMeta;
|
||||||
} STQ;
|
} STQ;
|
||||||
|
|
||||||
typedef struct STqMgmt {
|
typedef struct STqMgmt {
|
||||||
|
@ -313,7 +296,7 @@ int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
|
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
|
|
||||||
// void* will be replace by a msg type
|
// void* will be replace by a msg type
|
||||||
|
@ -337,23 +320,6 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
||||||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq);
|
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq);
|
||||||
|
|
||||||
typedef struct STqReadHandle {
|
|
||||||
int64_t ver;
|
|
||||||
SSubmitMsg* pMsg;
|
|
||||||
SSubmitBlk* pBlock;
|
|
||||||
SSubmitMsgIter msgIter;
|
|
||||||
SSubmitBlkIter blkIter;
|
|
||||||
SMeta* pMeta;
|
|
||||||
SArray* pColumnIdList;
|
|
||||||
} STqReadHandle;
|
|
||||||
|
|
||||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList);
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
|
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle);
|
|
||||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
|
|
||||||
// return SArray<SColumnInfoData>
|
|
||||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -22,7 +22,6 @@
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tfs.h"
|
#include "tfs.h"
|
||||||
#include "tq.h"
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
@ -35,6 +34,12 @@ typedef struct SVnode SVnode;
|
||||||
typedef struct SDnode SDnode;
|
typedef struct SDnode SDnode;
|
||||||
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
|
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
|
||||||
|
|
||||||
|
typedef struct STqCfg {
|
||||||
|
// TODO
|
||||||
|
int32_t reserved;
|
||||||
|
} STqCfg;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SVnodeCfg {
|
typedef struct SVnodeCfg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SDnode *pDnode;
|
SDnode *pDnode;
|
||||||
|
@ -61,6 +66,16 @@ typedef struct {
|
||||||
PutReqToVQueryQFp putReqToVQueryQFp;
|
PutReqToVQueryQFp putReqToVQueryQFp;
|
||||||
} SVnodeOpt;
|
} SVnodeOpt;
|
||||||
|
|
||||||
|
typedef struct STqReadHandle {
|
||||||
|
int64_t ver;
|
||||||
|
SSubmitMsg* pMsg;
|
||||||
|
SSubmitBlk* pBlock;
|
||||||
|
SSubmitMsgIter msgIter;
|
||||||
|
SSubmitBlkIter blkIter;
|
||||||
|
SMeta* pMeta;
|
||||||
|
SArray* pColumnIdList;
|
||||||
|
} STqReadHandle;
|
||||||
|
|
||||||
/* ------------------------ SVnode ------------------------ */
|
/* ------------------------ SVnode ------------------------ */
|
||||||
/**
|
/**
|
||||||
* @brief Initialize the vnode module
|
* @brief Initialize the vnode module
|
||||||
|
@ -180,6 +195,21 @@ int32_t vnodeCompact(SVnode *pVnode);
|
||||||
int32_t vnodeSync(SVnode *pVnode);
|
int32_t vnodeSync(SVnode *pVnode);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
|
|
||||||
|
/* ------------------------- TQ QUERY -------------------------- */
|
||||||
|
|
||||||
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta);
|
||||||
|
|
||||||
|
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) {
|
||||||
|
pReadHandle->pColumnIdList = pColumnIdList;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
|
||||||
|
bool tqNextDataBlock(STqReadHandle* pHandle);
|
||||||
|
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
|
||||||
|
// return SArray<SColumnInfoData>
|
||||||
|
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_TQ_INT_H_
|
#define _TD_TQ_INT_H_
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
#include "meta.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
#include "tmacro.h"
|
#include "tmacro.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ void tqCleanUp() {
|
||||||
taosTmrCleanUp(tqMgmt.timer);
|
taosTmrCleanUp(tqMgmt.timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
|
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) {
|
||||||
STQ* pTq = malloc(sizeof(STQ));
|
STQ* pTq = malloc(sizeof(STQ));
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
|
@ -58,6 +58,8 @@ STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory
|
||||||
}
|
}
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->tqConfig = tqConfig;
|
pTq->tqConfig = tqConfig;
|
||||||
|
pTq->pWal = pWal;
|
||||||
|
pTq->pMeta = pMeta;
|
||||||
#if 0
|
#if 0
|
||||||
pTq->tqMemRef.pAllocatorFactory = allocFac;
|
pTq->tqMemRef.pAllocatorFactory = allocFac;
|
||||||
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
||||||
|
@ -610,44 +612,52 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
SMqCVConsumeReq* pReq = pMsg->pCont;
|
SMqCVConsumeReq* pReq = pMsg->pCont;
|
||||||
int64_t reqId = pReq->reqId;
|
int64_t reqId = pReq->reqId;
|
||||||
int64_t consumerId = pReq->consumerId;
|
int64_t consumerId = pReq->consumerId;
|
||||||
int64_t offset = pReq->offset;
|
int64_t reqOffset = pReq->offset;
|
||||||
|
int64_t fetchOffset = reqOffset;
|
||||||
int64_t blockingTime = pReq->blockingTime;
|
int64_t blockingTime = pReq->blockingTime;
|
||||||
|
|
||||||
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||||
int sz = taosArrayGetSize(pConsumer->topics);
|
int sz = taosArrayGetSize(pConsumer->topics);
|
||||||
|
|
||||||
for (int i = 0 ; i < sz; i++) {
|
for (int i = 0 ; i < sz; i++) {
|
||||||
STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i);
|
STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i);
|
||||||
|
|
||||||
int8_t pos = offset % TQ_BUFFER_SIZE;
|
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
|
||||||
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
|
int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
|
||||||
if (old == 1) {
|
if (old == 1) {
|
||||||
// do nothing
|
// do nothing
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
|
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
|
||||||
// TODO
|
return -1;
|
||||||
}
|
}
|
||||||
SWalHead* pHead = pHandle->pReadhandle->pHead;
|
SWalHead* pHead = pTopic->pReadhandle->pHead;
|
||||||
while (pHead->head.msgType != TDMT_VND_SUBMIT) {
|
while (1) {
|
||||||
// read until find TDMT_VND_SUBMIT
|
// read until find TDMT_VND_SUBMIT
|
||||||
|
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
||||||
|
void* task = pTopic->buffer.output[pos].task;
|
||||||
|
|
||||||
/*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/
|
qSetStreamInput(task, pCont);
|
||||||
|
SSDataBlock* pDataBlock;
|
||||||
|
uint64_t ts;
|
||||||
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
|
|
||||||
|
}
|
||||||
// TODO: launch query and get output data
|
// TODO: launch query and get output data
|
||||||
void* outputData;
|
pTopic->buffer.output[pos].dst = pDataBlock;
|
||||||
pHandle->buffer.output[pos].dst = outputData;
|
if (pTopic->buffer.firstOffset == -1
|
||||||
if (pHandle->buffer.firstOffset == -1
|
|| pReq->offset < pTopic->buffer.firstOffset) {
|
||||||
|| pReq->offset < pHandle->buffer.firstOffset) {
|
pTopic->buffer.firstOffset = pReq->offset;
|
||||||
pHandle->buffer.firstOffset = pReq->offset;
|
|
||||||
}
|
}
|
||||||
if (pHandle->buffer.lastOffset == -1
|
if (pTopic->buffer.lastOffset == -1
|
||||||
|| pReq->offset > pHandle->buffer.lastOffset) {
|
|| pReq->offset > pTopic->buffer.lastOffset) {
|
||||||
pHandle->buffer.lastOffset = pReq->offset;
|
pTopic->buffer.lastOffset = pReq->offset;
|
||||||
}
|
}
|
||||||
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
|
atomic_store_8(&pTopic->buffer.output[pos].status, 1);
|
||||||
|
|
||||||
// put output into rsp
|
// put output into rsp
|
||||||
}
|
}
|
||||||
|
@ -674,29 +684,23 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
|
||||||
strcpy(pTopic->sql, pReq->sql);
|
strcpy(pTopic->sql, pReq->sql);
|
||||||
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
|
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
|
||||||
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
|
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
|
||||||
SArray *pArray;
|
|
||||||
//TODO: deserialize to SQueryDag
|
|
||||||
SQueryDag *pDag;
|
|
||||||
// convert to task
|
|
||||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
}
|
|
||||||
STaskInfo *pInfo = taosArrayGet(pArray, 0);
|
|
||||||
SArray* pTasks;
|
|
||||||
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
|
|
||||||
pTopic->buffer.firstOffset = -1;
|
pTopic->buffer.firstOffset = -1;
|
||||||
pTopic->buffer.lastOffset = -1;
|
pTopic->buffer.lastOffset = -1;
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
|
||||||
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
|
|
||||||
pTopic->buffer.output[i].status = 0;
|
|
||||||
pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL);
|
|
||||||
}
|
|
||||||
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
|
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
|
||||||
|
if (pTopic->pReadhandle == NULL) {
|
||||||
|
|
||||||
|
}
|
||||||
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
|
pTopic->buffer.output[i].status = 0;
|
||||||
|
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||||
|
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, pReadHandle);
|
||||||
|
}
|
||||||
// write mq meta
|
// write mq meta
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) {
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
||||||
if (pReadHandle == NULL) {
|
if (pReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -704,7 +708,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) {
|
||||||
pReadHandle->pMeta = pMeta;
|
pReadHandle->pMeta = pMeta;
|
||||||
pReadHandle->pMsg = NULL;
|
pReadHandle->pMsg = NULL;
|
||||||
pReadHandle->ver = -1;
|
pReadHandle->ver = -1;
|
||||||
pReadHandle->pColumnIdList = pColumnIdList;
|
pReadHandle->pColumnIdList = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
||||||
|
|
||||||
// Open TQ
|
// Open TQ
|
||||||
sprintf(dir, "%s/tq", pVnode->path);
|
sprintf(dir, "%s/tq", pVnode->path);
|
||||||
pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
|
pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
|
||||||
if (pVnode->pTq == NULL) {
|
if (pVnode->pTq == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -83,8 +83,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
||||||
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
||||||
}
|
}
|
||||||
vTrace("vgId:%d process create table %s", pVnode->vgId, pCreateTbReq->name);
|
|
||||||
free(pCreateTbReq->name);
|
free(pCreateTbReq->name);
|
||||||
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
||||||
free(pCreateTbReq->stbCfg.pSchema);
|
free(pCreateTbReq->stbCfg.pSchema);
|
||||||
|
@ -95,6 +95,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
free(pCreateTbReq->ntbCfg.pSchema);
|
free(pCreateTbReq->ntbCfg.pSchema);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vInfo("vgId:%d process create %"PRIzu" tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
||||||
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
|
@ -13,29 +13,75 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "planner.h"
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
#include "tq.h"
|
||||||
|
#include "executorimpl.h"
|
||||||
|
#include "planner.h"
|
||||||
|
|
||||||
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, uint64_t reqId) {
|
||||||
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
|
ASSERT(pOperator != NULL);
|
||||||
|
if (pOperator->operatorType != OP_StreamScan) {
|
||||||
|
if (pOperator->numOfDownstream == 0) {
|
||||||
|
qError("failed to find stream scan operator to set the input data block, reqId:0x%" PRIx64, reqId);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||||
|
qError("join not supported for stream block scan, reqId:0x%" PRIx64, reqId);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return doSetStreamBlock(pOperator->pDownstream[0], input, reqId);
|
||||||
|
} else {
|
||||||
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
|
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
|
||||||
|
if (tinfo == NULL) {
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (input == NULL) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||||
|
|
||||||
|
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
|
||||||
|
} else {
|
||||||
|
qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) {
|
||||||
|
if (pMsg == NULL || streamReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// print those info into log
|
// print those info into log
|
||||||
pMsg->sId = be64toh(pMsg->sId);
|
#if 0
|
||||||
pMsg->queryId = be64toh(pMsg->queryId);
|
pMsg->sId = pMsg->sId;
|
||||||
pMsg->taskId = be64toh(pMsg->taskId);
|
pMsg->queryId = pMsg->queryId;
|
||||||
pMsg->contentLen = ntohl(pMsg->contentLen);
|
pMsg->taskId = pMsg->taskId;
|
||||||
|
pMsg->contentLen = pMsg->contentLen;
|
||||||
|
#endif
|
||||||
|
|
||||||
struct SSubplan *plan = NULL;
|
struct SSubplan* plan = NULL;
|
||||||
int32_t code = qStringToSubplan(pMsg->msg, &plan);
|
int32_t code = qStringToSubplan(pMsg->msg, &plan);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
|
code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// TODO: destroy SSubplan & pTaskInfo
|
// TODO: destroy SSubplan & pTaskInfo
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "vnode.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
||||||
|
@ -5407,7 +5408,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
|
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
|
||||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -5417,16 +5418,28 @@ SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readerHandle = pStreamBlockHandle;
|
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo);
|
||||||
|
SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t));
|
||||||
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
|
||||||
|
|
||||||
|
taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the extract column id to streamHandle
|
||||||
|
tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
|
||||||
|
|
||||||
|
pInfo->readerHandle = streamReadHandle;
|
||||||
|
|
||||||
pOperator->name = "StreamBlockScanOperator";
|
pOperator->name = "StreamBlockScanOperator";
|
||||||
pOperator->operatorType = OP_StreamBlockScan;
|
pOperator->operatorType = OP_StreamScan;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blockingOptr = false;
|
||||||
pOperator->status = OP_IN_EXECUTING;
|
pOperator->status = OP_IN_EXECUTING;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = numOfOutput;
|
pOperator->numOfOutput = numOfOutput;
|
||||||
pOperator->exec = doStreamBlockScan;
|
pOperator->exec = doStreamBlockScan;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -7704,6 +7717,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
|
||||||
} else if (pPhyNode->info.type == OP_Exchange) {
|
} else if (pPhyNode->info.type == OP_Exchange) {
|
||||||
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
|
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
|
||||||
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
|
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
|
||||||
|
} else if (pPhyNode->info.type == OP_StreamScan) {
|
||||||
|
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
|
||||||
|
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,9 +62,8 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
|
||||||
pEpSet->port[i] = info->epAddr[i].port;
|
pEpSet->port[i] = info->epAddr[i].port;
|
||||||
}
|
}
|
||||||
|
|
||||||
*outputLen = sizeof(SVShowTablesReq);
|
*outputLen = sizeof(SVShowTablesReq);
|
||||||
*output = pShowReq;
|
*output = pShowReq;
|
||||||
|
|
||||||
*pExtension = array;
|
*pExtension = array;
|
||||||
} else {
|
} else {
|
||||||
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
|
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
|
||||||
|
|
|
@ -36,25 +36,29 @@ bool qIsDdlQuery(const SQueryNode* pQueryNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SSqlInfo info = doGenerateAST(pCxt->pSql);
|
SSqlInfo info = doGenerateAST(pCxt->pSql);
|
||||||
if (!info.valid) {
|
if (!info.valid) {
|
||||||
strncpy(pCxt->pMsg, info.msg, pCxt->msgLen);
|
strncpy(pCxt->pMsg, info.msg, pCxt->msgLen);
|
||||||
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||||
return terrno;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isDqlSqlStatement(&info)) {
|
if (!isDqlSqlStatement(&info)) {
|
||||||
if (info.type == TSDB_SQL_CREATE_TABLE) {
|
if (info.type == TSDB_SQL_CREATE_TABLE) {
|
||||||
SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen);
|
SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen);
|
||||||
if (pModifStmtInfo == NULL) {
|
if (pModifStmtInfo == NULL) {
|
||||||
return terrno;
|
code = terrno;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pQuery = (SQueryNode*)pModifStmtInfo;
|
*pQuery = (SQueryNode*)pModifStmtInfo;
|
||||||
} else {
|
} else {
|
||||||
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen);
|
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, pCxt, pCxt->pMsg, pCxt->msgLen);
|
||||||
if (pDcl == NULL) {
|
if (pDcl == NULL) {
|
||||||
return terrno;
|
code = terrno;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pQuery = (SQueryNode*)pDcl;
|
*pQuery = (SQueryNode*)pDcl;
|
||||||
|
@ -63,21 +67,22 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
} else {
|
} else {
|
||||||
SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
||||||
if (pQueryInfo == NULL) {
|
if (pQueryInfo == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
|
code = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
|
||||||
return terrno;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = qParserValidateSqlNode(pCxt, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
|
code = qParserValidateSqlNode(pCxt, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
*pQuery = (SQueryNode*)pQueryInfo;
|
*pQuery = (SQueryNode*)pQueryInfo;
|
||||||
} else {
|
} else {
|
||||||
terrno = code;
|
goto _end;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
destroySqlInfo(&info);
|
destroySqlInfo(&info);
|
||||||
return TSDB_CODE_SUCCESS;
|
terrno = code;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQueryNode) {
|
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQueryNode) {
|
||||||
|
@ -247,5 +252,6 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
|
||||||
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
|
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
|
||||||
taosArrayDestroy(pModifInfo->pDataBlocks);
|
taosArrayDestroy(pModifInfo->pDataBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pQueryNode);
|
tfree(pQueryNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1485,11 +1485,11 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
||||||
|
|
||||||
pMsg->header.vgId = htonl(tInfo.addr.nodeId);
|
pMsg->header.vgId = htonl(tInfo.addr.nodeId);
|
||||||
|
|
||||||
pMsg->sId = htobe64(schMgmt.sId);
|
pMsg->sId = schMgmt.sId;
|
||||||
pMsg->queryId = htobe64(plan->id.queryId);
|
pMsg->queryId = plan->id.queryId;
|
||||||
pMsg->taskId = htobe64(schGenUUID());
|
pMsg->taskId = schGenUUID();
|
||||||
pMsg->taskType = TASK_TYPE_PERSISTENT;
|
pMsg->taskType = TASK_TYPE_PERSISTENT;
|
||||||
pMsg->contentLen = htonl(msgLen);
|
pMsg->contentLen = msgLen;
|
||||||
memcpy(pMsg->msg, msg, msgLen);
|
memcpy(pMsg->msg, msg, msgLen);
|
||||||
|
|
||||||
tInfo.msg = pMsg;
|
tInfo.msg = pMsg;
|
||||||
|
|
|
@ -118,7 +118,11 @@ int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
|
||||||
pDiskId->id = -1;
|
pDiskId->id = -1;
|
||||||
|
|
||||||
if (pDiskId->level >= pTfs->nlevel) {
|
if (pDiskId->level >= pTfs->nlevel) {
|
||||||
pDiskId->level--;
|
pDiskId->level = pTfs->nlevel - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDiskId->level < 0) {
|
||||||
|
pDiskId->level = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pDiskId->level >= 0) {
|
while (pDiskId->level >= 0) {
|
||||||
|
@ -289,7 +293,7 @@ int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) {
|
||||||
STfsDisk *pDisk = pTier->disks[id];
|
STfsDisk *pDisk = pTier->disks[id];
|
||||||
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
|
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
|
||||||
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
|
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
|
||||||
if (taosRenameFile(oaname, naname) != 0) {
|
if (taosRenameFile(oaname, naname) != 0 && errno != ENOENT) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
fError("failed to rename %s to %s since %s", oaname, naname, terrstr());
|
fError("failed to rename %s to %s since %s", oaname, naname, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -69,7 +69,7 @@ STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg) {
|
||||||
pTier->disks[id] = pDisk;
|
pTier->disks[id] = pDisk;
|
||||||
pTier->ndisk++;
|
pTier->ndisk++;
|
||||||
|
|
||||||
fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
|
fDebug("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
|
||||||
return pTier->disks[id];
|
return pTier->disks[id];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -246,29 +246,13 @@ TEST_F(TfsTest, 04_File) {
|
||||||
snprintf(fulldir, 128, "%s%s%s", root, TD_DIRSEP, "t3");
|
snprintf(fulldir, 128, "%s%s%s", root, TD_DIRSEP, "t3");
|
||||||
EXPECT_STREQ(dir, fulldir);
|
EXPECT_STREQ(dir, fulldir);
|
||||||
|
|
||||||
EXPECT_NE(tfsCopyFile(&f1, &f2), 0);
|
EXPECT_GT(tfsCopyFile(&f1, &f2), 0);
|
||||||
|
|
||||||
char af2[128] = {0};
|
char af2[128] = {0};
|
||||||
snprintf(af2, 128, "%s%s%s", root, TD_DIRSEP, n2);
|
snprintf(af2, 128, "%s%s%s", root, TD_DIRSEP, n2);
|
||||||
EXPECT_EQ(taosDirExist(af2), 0);
|
EXPECT_EQ(taosDirExist(af2), 0);
|
||||||
tfsRemoveFile(&f2);
|
tfsRemoveFile(&f2);
|
||||||
EXPECT_NE(taosDirExist(af2), 0);
|
EXPECT_NE(taosDirExist(af2), 0);
|
||||||
EXPECT_NE(tfsCopyFile(&f1, &f2), 0);
|
|
||||||
|
|
||||||
{
|
|
||||||
STfsDir *pDir = tfsOpendir(pTfs, "");
|
|
||||||
|
|
||||||
const STfsFile *pf1 = tfsReaddir(pDir);
|
|
||||||
EXPECT_STREQ(pf1->rname, "t3");
|
|
||||||
EXPECT_EQ(pf1->did.id, 0);
|
|
||||||
EXPECT_EQ(pf1->did.level, 0);
|
|
||||||
EXPECT_EQ(pf1->pTfs, pTfs);
|
|
||||||
|
|
||||||
const STfsFile *pf2 = tfsReaddir(pDir);
|
|
||||||
EXPECT_EQ(pf2, nullptr);
|
|
||||||
|
|
||||||
tfsClosedir(pDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
STfsDir *pDir = tfsOpendir(pTfs, "t3");
|
STfsDir *pDir = tfsOpendir(pTfs, "t3");
|
||||||
|
@ -280,7 +264,26 @@ TEST_F(TfsTest, 04_File) {
|
||||||
EXPECT_EQ(pf1->pTfs, pTfs);
|
EXPECT_EQ(pf1->pTfs, pTfs);
|
||||||
|
|
||||||
const STfsFile *pf2 = tfsReaddir(pDir);
|
const STfsFile *pf2 = tfsReaddir(pDir);
|
||||||
EXPECT_NE(pf2, nullptr);
|
EXPECT_EQ(pf2, nullptr);
|
||||||
|
|
||||||
|
tfsClosedir(pDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_GT(tfsCopyFile(&f1, &f2), 0);
|
||||||
|
|
||||||
|
{
|
||||||
|
STfsDir *pDir = tfsOpendir(pTfs, "t3");
|
||||||
|
|
||||||
|
const STfsFile *pf1 = tfsReaddir(pDir);
|
||||||
|
EXPECT_NE(pf1, nullptr);
|
||||||
|
EXPECT_EQ(pf1->did.id, 0);
|
||||||
|
EXPECT_EQ(pf1->did.level, 0);
|
||||||
|
EXPECT_EQ(pf1->pTfs, pTfs);
|
||||||
|
|
||||||
|
const STfsFile *pf2 = tfsReaddir(pDir);
|
||||||
|
EXPECT_EQ(pf2->did.id, 0);
|
||||||
|
EXPECT_EQ(pf2->did.level, 0);
|
||||||
|
EXPECT_EQ(pf2->pTfs, pTfs);
|
||||||
|
|
||||||
const STfsFile *pf3 = tfsReaddir(pDir);
|
const STfsFile *pf3 = tfsReaddir(pDir);
|
||||||
EXPECT_EQ(pf3, nullptr);
|
EXPECT_EQ(pf3, nullptr);
|
||||||
|
@ -289,5 +292,415 @@ TEST_F(TfsTest, 04_File) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tfsClose(pTfs);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TfsTest, 05_MultiDisk) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
const char *root00 = "/tmp/tfsTest00";
|
||||||
|
const char *root01 = "/tmp/tfsTest01";
|
||||||
|
const char *root10 = "/tmp/tfsTest10";
|
||||||
|
const char *root11 = "/tmp/tfsTest11";
|
||||||
|
const char *root12 = "/tmp/tfsTest12";
|
||||||
|
const char *root20 = "/tmp/tfsTest20";
|
||||||
|
const char *root21 = "/tmp/tfsTest21";
|
||||||
|
const char *root22 = "/tmp/tfsTest22";
|
||||||
|
const char *root23 = "/tmp/tfsTest23";
|
||||||
|
|
||||||
|
SDiskCfg dCfg[9] = {0};
|
||||||
|
tstrncpy(dCfg[0].dir, root01, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[0].level = 0;
|
||||||
|
dCfg[0].primary = 0;
|
||||||
|
tstrncpy(dCfg[1].dir, root00, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[1].level = 0;
|
||||||
|
dCfg[1].primary = 0;
|
||||||
|
tstrncpy(dCfg[2].dir, root20, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[2].level = 2;
|
||||||
|
dCfg[2].primary = 0;
|
||||||
|
tstrncpy(dCfg[3].dir, root21, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[3].level = 2;
|
||||||
|
dCfg[3].primary = 0;
|
||||||
|
tstrncpy(dCfg[4].dir, root22, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[4].level = 2;
|
||||||
|
dCfg[4].primary = 0;
|
||||||
|
tstrncpy(dCfg[5].dir, root23, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[5].level = 2;
|
||||||
|
dCfg[5].primary = 0;
|
||||||
|
tstrncpy(dCfg[6].dir, root10, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[6].level = 1;
|
||||||
|
dCfg[6].primary = 0;
|
||||||
|
tstrncpy(dCfg[7].dir, root11, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[7].level = 1;
|
||||||
|
dCfg[7].primary = 0;
|
||||||
|
tstrncpy(dCfg[8].dir, root12, TSDB_FILENAME_LEN);
|
||||||
|
dCfg[8].level = 1;
|
||||||
|
dCfg[8].primary = 0;
|
||||||
|
|
||||||
|
taosRemoveDir(root00);
|
||||||
|
taosRemoveDir(root01);
|
||||||
|
taosRemoveDir(root10);
|
||||||
|
taosRemoveDir(root11);
|
||||||
|
taosRemoveDir(root12);
|
||||||
|
taosRemoveDir(root20);
|
||||||
|
taosRemoveDir(root21);
|
||||||
|
taosRemoveDir(root22);
|
||||||
|
taosRemoveDir(root23);
|
||||||
|
taosMkDir(root00);
|
||||||
|
taosMkDir(root01);
|
||||||
|
taosMkDir(root10);
|
||||||
|
taosMkDir(root11);
|
||||||
|
taosMkDir(root12);
|
||||||
|
taosMkDir(root20);
|
||||||
|
taosMkDir(root21);
|
||||||
|
taosMkDir(root22);
|
||||||
|
taosMkDir(root23);
|
||||||
|
|
||||||
|
STfs *pTfs = tfsOpen(dCfg, 9);
|
||||||
|
ASSERT_EQ(pTfs, nullptr);
|
||||||
|
|
||||||
|
dCfg[0].primary = 1;
|
||||||
|
dCfg[1].primary = 1;
|
||||||
|
pTfs = tfsOpen(dCfg, 9);
|
||||||
|
ASSERT_EQ(pTfs, nullptr);
|
||||||
|
|
||||||
|
dCfg[0].primary = 0;
|
||||||
|
dCfg[1].primary = 1;
|
||||||
|
pTfs = tfsOpen(dCfg, 9);
|
||||||
|
ASSERT_NE(pTfs, nullptr);
|
||||||
|
|
||||||
|
tfsUpdateSize(pTfs);
|
||||||
|
SDiskSize size = tfsGetSize(pTfs);
|
||||||
|
|
||||||
|
EXPECT_GT(size.avail, 0);
|
||||||
|
EXPECT_GT(size.used, 0);
|
||||||
|
EXPECT_GT(size.total, size.avail);
|
||||||
|
EXPECT_GT(size.total, size.used);
|
||||||
|
|
||||||
|
//------------- AllocDisk -----------------//
|
||||||
|
{
|
||||||
|
const char *path = NULL;
|
||||||
|
SDiskID did;
|
||||||
|
did.id = 0;
|
||||||
|
did.level = 0;
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 0, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 0);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root00);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 0, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 1);
|
||||||
|
EXPECT_EQ(did.level, 0);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root01);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 0, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 0);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root00);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 0, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 1);
|
||||||
|
EXPECT_EQ(did.level, 0);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root01);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 0, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 0);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root00);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 0, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 1);
|
||||||
|
EXPECT_EQ(did.level, 0);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root01);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 1, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 1);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root10);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 1, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 1);
|
||||||
|
EXPECT_EQ(did.level, 1);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root11);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 1, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 2);
|
||||||
|
EXPECT_EQ(did.level, 1);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root12);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 1, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 1);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root10);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 2, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root20);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 2, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 1);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root21);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 2, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 2);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root22);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 2, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 3);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root23);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 2, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 0);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root20);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 3, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 1);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root21);
|
||||||
|
|
||||||
|
code = tfsAllocDisk(pTfs, 4, &did);
|
||||||
|
EXPECT_EQ(code, 0);
|
||||||
|
EXPECT_EQ(did.id, 2);
|
||||||
|
EXPECT_EQ(did.level, 2);
|
||||||
|
path = tfsGetDiskPath(pTfs, did);
|
||||||
|
EXPECT_STREQ(path, root22);
|
||||||
|
|
||||||
|
const char *primary = tfsGetPrimaryPath(pTfs);
|
||||||
|
EXPECT_STREQ(primary, root00);
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------- Dir -----------------//
|
||||||
|
{
|
||||||
|
char p1[] = "p1";
|
||||||
|
char ap00[128] = {0};
|
||||||
|
snprintf(ap00, 128, "%s%s%s", root00, TD_DIRSEP, p1);
|
||||||
|
char ap01[128] = {0};
|
||||||
|
snprintf(ap01, 128, "%s%s%s", root01, TD_DIRSEP, p1);
|
||||||
|
char ap10[128] = {0};
|
||||||
|
snprintf(ap10, 128, "%s%s%s", root10, TD_DIRSEP, p1);
|
||||||
|
char ap11[128] = {0};
|
||||||
|
snprintf(ap11, 128, "%s%s%s", root11, TD_DIRSEP, p1);
|
||||||
|
char ap12[128] = {0};
|
||||||
|
snprintf(ap12, 128, "%s%s%s", root12, TD_DIRSEP, p1);
|
||||||
|
char ap20[128] = {0};
|
||||||
|
snprintf(ap20, 128, "%s%s%s", root20, TD_DIRSEP, p1);
|
||||||
|
char ap21[128] = {0};
|
||||||
|
snprintf(ap21, 128, "%s%s%s", root21, TD_DIRSEP, p1);
|
||||||
|
char ap22[128] = {0};
|
||||||
|
snprintf(ap22, 128, "%s%s%s", root22, TD_DIRSEP, p1);
|
||||||
|
char ap23[128] = {0};
|
||||||
|
snprintf(ap23, 128, "%s%s%s", root23, TD_DIRSEP, p1);
|
||||||
|
EXPECT_NE(taosDirExist(ap00), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap01), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap10), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap11), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap12), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap20), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap21), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap22), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap23), 0);
|
||||||
|
EXPECT_EQ(tfsMkdir(pTfs, p1), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap00), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap01), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap10), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap11), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap12), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap20), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap21), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap22), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(ap23), 0);
|
||||||
|
EXPECT_EQ(tfsRmdir(pTfs, p1), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap00), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap01), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap10), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap11), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap12), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap20), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap21), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap22), 0);
|
||||||
|
EXPECT_NE(taosDirExist(ap23), 0);
|
||||||
|
|
||||||
|
char p2[] = "p2";
|
||||||
|
char _ap21[128] = {0};
|
||||||
|
snprintf(_ap21, 128, "%s%s%s", root21, TD_DIRSEP, p2);
|
||||||
|
SDiskID did = {0};
|
||||||
|
did.level = 2;
|
||||||
|
did.id = 1;
|
||||||
|
EXPECT_NE(taosDirExist(_ap21), 0);
|
||||||
|
EXPECT_EQ(tfsMkdirAt(pTfs, p2, did), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(_ap21), 0);
|
||||||
|
|
||||||
|
char p3[] = "p3/p2/p1/p0";
|
||||||
|
char _ap12[128] = {0};
|
||||||
|
snprintf(_ap12, 128, "%s%s%s", root12, TD_DIRSEP, p3);
|
||||||
|
did.level = 1;
|
||||||
|
did.id = 2;
|
||||||
|
EXPECT_NE(taosDirExist(_ap12), 0);
|
||||||
|
EXPECT_NE(tfsMkdir(pTfs, p3), 0);
|
||||||
|
EXPECT_NE(tfsMkdirAt(pTfs, p3, did), 0);
|
||||||
|
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p3, did), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(_ap12), 0);
|
||||||
|
EXPECT_EQ(tfsRmdir(pTfs, p3), 0);
|
||||||
|
EXPECT_NE(taosDirExist(_ap12), 0);
|
||||||
|
|
||||||
|
char p45[] = "p5";
|
||||||
|
char p44[] = "p4";
|
||||||
|
char p4[] = "p4/p2/p1/p0";
|
||||||
|
char _ap22[128] = {0};
|
||||||
|
snprintf(_ap22, 128, "%s%s%s", root22, TD_DIRSEP, p4);
|
||||||
|
did.level = 2;
|
||||||
|
did.id = 2;
|
||||||
|
|
||||||
|
EXPECT_NE(taosDirExist(_ap22), 0);
|
||||||
|
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0);
|
||||||
|
EXPECT_EQ(taosDirExist(_ap22), 0);
|
||||||
|
EXPECT_EQ(tfsRename(pTfs, p44, p45), 0);
|
||||||
|
EXPECT_EQ(tfsRmdir(pTfs, p4), 0);
|
||||||
|
EXPECT_NE(taosDirExist(_ap22), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------- File -----------------//
|
||||||
|
{
|
||||||
|
STfsFile file0;
|
||||||
|
STfsFile file1;
|
||||||
|
STfsFile file2;
|
||||||
|
STfsFile file3;
|
||||||
|
STfsFile file4;
|
||||||
|
SDiskID did0 = {0};
|
||||||
|
SDiskID did1 = {0};
|
||||||
|
SDiskID did2 = {0};
|
||||||
|
SDiskID did3 = {0};
|
||||||
|
SDiskID did4 = {0};
|
||||||
|
did3.id = 1;
|
||||||
|
did4.level = 1;
|
||||||
|
tfsInitFile(pTfs, &file0, did0, "fname");
|
||||||
|
tfsInitFile(pTfs, &file1, did1, "fname");
|
||||||
|
tfsInitFile(pTfs, &file2, did2, "fnamex");
|
||||||
|
tfsInitFile(pTfs, &file3, did3, "fname");
|
||||||
|
tfsInitFile(pTfs, &file4, did4, "fname");
|
||||||
|
|
||||||
|
EXPECT_TRUE(tfsIsSameFile(&file0, &file1));
|
||||||
|
EXPECT_FALSE(tfsIsSameFile(&file0, &file2));
|
||||||
|
EXPECT_FALSE(tfsIsSameFile(&file0, &file3));
|
||||||
|
EXPECT_FALSE(tfsIsSameFile(&file0, &file4));
|
||||||
|
|
||||||
|
{
|
||||||
|
char n1[] = "t3/t1.json";
|
||||||
|
char n2[] = "t3/t2.json";
|
||||||
|
STfsFile f1 = {0};
|
||||||
|
STfsFile f2 = {0};
|
||||||
|
SDiskID did;
|
||||||
|
did1.level = 1;
|
||||||
|
did1.id = 2;
|
||||||
|
did2.level = 2;
|
||||||
|
did2.id = 3;
|
||||||
|
|
||||||
|
tfsInitFile(pTfs, &f1, did1, n1);
|
||||||
|
tfsInitFile(pTfs, &f2, did2, n2);
|
||||||
|
|
||||||
|
EXPECT_EQ(tfsMkdir(pTfs, "t3"), 0);
|
||||||
|
|
||||||
|
FILE *fp = fopen(f1.aname, "w");
|
||||||
|
ASSERT_NE(fp, nullptr);
|
||||||
|
fwrite("12345678", 1, 5, fp);
|
||||||
|
fclose(fp);
|
||||||
|
|
||||||
|
char base[128] = {0};
|
||||||
|
tfsBasename(&f1, base);
|
||||||
|
char dir[128] = {0};
|
||||||
|
tfsDirname(&f1, dir);
|
||||||
|
|
||||||
|
EXPECT_STREQ(base, "t1.json");
|
||||||
|
|
||||||
|
char fulldir[128];
|
||||||
|
snprintf(fulldir, 128, "%s%s%s", root12, TD_DIRSEP, "t3");
|
||||||
|
EXPECT_STREQ(dir, fulldir);
|
||||||
|
|
||||||
|
EXPECT_GT(tfsCopyFile(&f1, &f2), 0);
|
||||||
|
|
||||||
|
char af2[128] = {0};
|
||||||
|
snprintf(af2, 128, "%s%s%s", root23, TD_DIRSEP, n2);
|
||||||
|
EXPECT_EQ(taosDirExist(af2), 0);
|
||||||
|
tfsRemoveFile(&f2);
|
||||||
|
|
||||||
|
{
|
||||||
|
STfsDir *pDir = tfsOpendir(pTfs, "t3");
|
||||||
|
|
||||||
|
const STfsFile *pf1 = tfsReaddir(pDir);
|
||||||
|
EXPECT_NE(pf1, nullptr);
|
||||||
|
EXPECT_EQ(pf1->did.level, 1);
|
||||||
|
EXPECT_EQ(pf1->did.id, 2);
|
||||||
|
EXPECT_EQ(pf1->pTfs, pTfs);
|
||||||
|
|
||||||
|
const STfsFile *pf2 = tfsReaddir(pDir);
|
||||||
|
EXPECT_EQ(pf2, nullptr);
|
||||||
|
|
||||||
|
tfsClosedir(pDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
EXPECT_NE(taosDirExist(af2), 0);
|
||||||
|
EXPECT_GT(tfsCopyFile(&f1, &f2), 0);
|
||||||
|
|
||||||
|
{
|
||||||
|
STfsDir *pDir = tfsOpendir(pTfs, "t3");
|
||||||
|
|
||||||
|
const STfsFile *pf1 = tfsReaddir(pDir);
|
||||||
|
EXPECT_NE(pf1, nullptr);
|
||||||
|
EXPECT_GT(pf1->did.level, 0);
|
||||||
|
EXPECT_GT(pf1->did.id, 0);
|
||||||
|
EXPECT_EQ(pf1->pTfs, pTfs);
|
||||||
|
|
||||||
|
const STfsFile *pf2 = tfsReaddir(pDir);
|
||||||
|
EXPECT_NE(pf1, nullptr);
|
||||||
|
EXPECT_GT(pf1->did.level, 0);
|
||||||
|
EXPECT_GT(pf1->did.id, 0);
|
||||||
|
EXPECT_EQ(pf1->pTfs, pTfs);
|
||||||
|
|
||||||
|
const STfsFile *pf3 = tfsReaddir(pDir);
|
||||||
|
EXPECT_EQ(pf3, nullptr);
|
||||||
|
|
||||||
|
tfsClosedir(pDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tfsClose(pTfs);
|
tfsClose(pTfs);
|
||||||
}
|
}
|
|
@ -70,7 +70,7 @@ createNewCfgFile() {
|
||||||
echo "dataDir ${dataDir}" >> ${cfgFile}
|
echo "dataDir ${dataDir}" >> ${cfgFile}
|
||||||
echo "logDir ${logDir}" >> ${cfgFile}
|
echo "logDir ${logDir}" >> ${cfgFile}
|
||||||
echo "serverPort ${serverPort}" >> ${cfgFile}
|
echo "serverPort ${serverPort}" >> ${cfgFile}
|
||||||
|
echo "numOfLogLines 100000000" >> ${cfgFile}
|
||||||
echo "supportVnodes 1024" >> ${cfgFile}
|
echo "supportVnodes 1024" >> ${cfgFile}
|
||||||
#echo "asyncLog 0" >> ${cfgFile}
|
#echo "asyncLog 0" >> ${cfgFile}
|
||||||
echo "telemetryReporting 0" >> ${cfgFile}
|
echo "telemetryReporting 0" >> ${cfgFile}
|
||||||
|
|
Loading…
Reference in New Issue