Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode
This commit is contained in:
commit
6f741e70cb
|
@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
|
|||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqHbVgInfo {
|
||||
int32_t vgId;
|
||||
} SMqHbVgInfo;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqHbTopicInfo {
|
||||
int32_t epoch;
|
||||
int64_t topicUid;
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* pVgInfo;
|
||||
} SMqHbTopicInfo;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
|
||||
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
|
||||
tlen += taosEncodeString(buf, pTopicInfo->name);
|
||||
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
|
||||
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
|
||||
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
|
||||
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
|
||||
buf = taosDecodeStringTo(buf, pTopicInfo->name);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqHbVgInfo vgInfo;
|
||||
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
|
||||
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqHbMsg {
|
||||
int32_t status; // ask hb endpoint
|
||||
int32_t epoch;
|
||||
int64_t consumerId;
|
||||
SArray* pTopics; // SArray<SMqHbTopicInfo>
|
||||
} SMqHbMsg;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pMsg->status);
|
||||
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
|
||||
tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
|
||||
int32_t sz = taosArrayGetSize(pMsg->pTopics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
|
||||
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
||||
buf = taosDecodeFixedI32(buf, &pMsg->status);
|
||||
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
|
||||
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SMqHbTopicInfo topicInfo;
|
||||
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
|
||||
taosArrayPush(pMsg->pTopics, &topicInfo);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
|
@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqHbOneTopicBatchRsp {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* rsps; // SArray<SMqHbRsp>
|
||||
} SMqHbOneTopicBatchRsp;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeString(buf, pBatchRsp->topicName);
|
||||
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
|
||||
tlen += taosEncodeSMqHbRsp(buf, pRsp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
|
||||
int32_t sz;
|
||||
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqHbRsp rsp;
|
||||
buf = taosDecodeSMqHbRsp(buf, &rsp);
|
||||
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqHbBatchRsp {
|
||||
int64_t consumerId;
|
||||
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
|
||||
} SMqHbBatchRsp;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
|
||||
int32_t sz;
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
|
||||
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
|
||||
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqHbOneTopicBatchRsp rsp;
|
||||
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
|
||||
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int32_t acctId;
|
||||
int64_t clusterId;
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct SOutputData {
|
|||
int8_t compressed;
|
||||
char* pData;
|
||||
bool queryEnd;
|
||||
bool needSchedule;
|
||||
int32_t scheduleJobNo;
|
||||
int32_t bufStatus;
|
||||
int64_t useconds;
|
||||
int8_t precision;
|
||||
|
|
|
@ -55,7 +55,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
|
|||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
|
|
|
@ -59,6 +59,11 @@ typedef struct SQueryResult {
|
|||
char *msg;
|
||||
} SQueryResult;
|
||||
|
||||
typedef struct STaskInfo {
|
||||
SQueryNodeAddr addr;
|
||||
SSubQueryMsg *msg;
|
||||
} STaskInfo;
|
||||
|
||||
int32_t schedulerInit(SSchedulerCfg *cfg);
|
||||
|
||||
/**
|
||||
|
@ -101,6 +106,17 @@ void scheduleFreeJob(void *pJob);
|
|||
|
||||
void schedulerDestroy(void);
|
||||
|
||||
/**
|
||||
* convert dag to task list
|
||||
* @param pDag
|
||||
* @param pTasks SArray**<STaskInfo>
|
||||
* @return
|
||||
*/
|
||||
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks);
|
||||
|
||||
void schedulerFreeTaskList(SArray *taskList);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -356,7 +356,11 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
|
||||
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
|
||||
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
|
||||
|
||||
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
|
||||
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
|
||||
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
|
||||
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
|
||||
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
|
||||
|
||||
// grant
|
||||
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
|
||||
|
|
|
@ -322,24 +322,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
|
|||
taosArrayDestroy(pArray);
|
||||
return NULL;
|
||||
}
|
||||
strcpy(kv.key, "groupId");
|
||||
kv.keyLen = strlen("groupId") + 1;
|
||||
kv.value = malloc(256);
|
||||
if (kv.value == NULL) {
|
||||
free(kv.key);
|
||||
taosArrayDestroy(pArray);
|
||||
return NULL;
|
||||
strcpy(kv.key, "mq-tmp");
|
||||
kv.keyLen = strlen("mq-tmp") + 1;
|
||||
SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
|
||||
if (pMqHb == NULL) {
|
||||
return pArray;
|
||||
}
|
||||
strcpy(kv.value, pTmq->groupId);
|
||||
kv.valueLen = strlen(pTmq->groupId) + 1;
|
||||
|
||||
pMqHb->consumerId = connKey.connId;
|
||||
SArray* clientTopics = pTmq->clientTopics;
|
||||
int sz = taosArrayGetSize(clientTopics);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
|
||||
if (pCTopic->vgId == -1) {
|
||||
pMqHb->status = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
kv.value = pMqHb;
|
||||
kv.valueLen = sizeof(SMqHbMsg);
|
||||
taosArrayPush(pArray, &kv);
|
||||
strcpy(kv.key, "clientUid");
|
||||
kv.keyLen = strlen("clientUid") + 1;
|
||||
*(uint32_t*)kv.value = pTmq->pTscObj->connId;
|
||||
kv.valueLen = sizeof(uint32_t);
|
||||
|
||||
return NULL;
|
||||
return pArray;
|
||||
}
|
||||
|
||||
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
||||
|
|
|
@ -277,12 +277,15 @@ TEST(testCase, connect_Test) {
|
|||
// ASSERT_EQ(numOfFields, 0);
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
// taos_free_result(pRes);
|
||||
// pRes = taos_query(pConn, "drop stable `123_$^)`");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
|
@ -518,30 +521,30 @@ TEST(testCase, show_stable_Test) {
|
|||
// taosHashCleanup(phash);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, create_topic_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// ASSERT_TRUE(pFields == nullptr);
|
||||
//
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
// ASSERT_EQ(numOfFields, 0);
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// char* sql = "select * from tu";
|
||||
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
TEST(testCase, create_topic_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
ASSERT_TRUE(pFields == nullptr);
|
||||
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
ASSERT_EQ(numOfFields, 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
char* sql = "select * from tu";
|
||||
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, insert_test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// ASSERT_NE(pConn, nullptr);
|
||||
|
|
|
@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
|
|||
int32_t kvNum = taosHashGetSize(pReq->info);
|
||||
tlen += taosEncodeFixedI32(buf, kvNum);
|
||||
SKv kv;
|
||||
void* pIter = taosHashIterate(pReq->info, pIter);
|
||||
void* pIter = taosHashIterate(pReq->info, NULL);
|
||||
while (pIter != NULL) {
|
||||
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
|
||||
kv.valueLen = taosHashGetDataLen(pIter);
|
||||
|
|
|
@ -346,19 +346,23 @@ typedef struct SMqTopicObj {
|
|||
char *logicalPlan;
|
||||
char *physicalPlan;
|
||||
SHashObj *cgroups; // SHashObj<SMqCGroup>
|
||||
SHashObj *consumers; // SHashObj<SMqConsumerObj>
|
||||
} SMqTopicObj;
|
||||
|
||||
// TODO: add cache and change name to id
|
||||
typedef struct SMqConsumerTopic {
|
||||
char name[TSDB_TOPIC_NAME_LEN];
|
||||
SList *vgroups; // SList<int32_t>
|
||||
int32_t epoch;
|
||||
char name[TSDB_TOPIC_NAME_LEN];
|
||||
//TODO: replace with something with ep
|
||||
SList *vgroups; // SList<int32_t>
|
||||
} SMqConsumerTopic;
|
||||
|
||||
typedef struct SMqConsumerObj {
|
||||
SRWLatch lock;
|
||||
int64_t consumerId;
|
||||
SRWLatch lock;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
SArray *topics; // SArray<SMqConsumerTopic>
|
||||
SHashObj *topicHash;
|
||||
} SMqConsumerObj;
|
||||
|
||||
typedef struct SMqSubConsumerObj {
|
||||
|
|
|
@ -15,10 +15,13 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndProfile.h"
|
||||
#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
|
||||
#define QUERY_ID_SIZE 20
|
||||
#define QUERY_OBJ_ID_SIZE 18
|
||||
|
@ -269,29 +272,95 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
|
||||
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
pRsp->connKey = pReq->connKey;
|
||||
SMqHbBatchRsp batchRsp;
|
||||
batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
|
||||
if (batchRsp.batchRsps == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
SClientHbKey connKey = pReq->connKey;
|
||||
SHashObj* pObj = pReq->info;
|
||||
SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
|
||||
if (pKv == NULL) {
|
||||
free(pRsp);
|
||||
return NULL;
|
||||
}
|
||||
SMqHbMsg mqHb;
|
||||
taosDecodeSMqMsg(pKv->value, &mqHb);
|
||||
/*int64_t clientUid = htonl(pKv->value);*/
|
||||
/*if (mqHb.epoch )*/
|
||||
int sz = taosArrayGetSize(mqHb.pTopics);
|
||||
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SMqHbOneTopicBatchRsp innerBatchRsp;
|
||||
innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
||||
if (innerBatchRsp.rsps == NULL) {
|
||||
//TODO
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
|
||||
SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
|
||||
if (pConsumerTopic->epoch != topicInfo->epoch) {
|
||||
//add new vgids into rsp
|
||||
int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
|
||||
for (int j = 0; j < vgSz; j++) {
|
||||
SMqHbRsp innerRsp;
|
||||
SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
|
||||
SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
|
||||
innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
taosArrayPush(innerBatchRsp.rsps, &innerRsp);
|
||||
}
|
||||
}
|
||||
taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
|
||||
}
|
||||
int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
//TODO
|
||||
return NULL;
|
||||
}
|
||||
void* abuf = buf;
|
||||
taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
|
||||
pRsp->body = buf;
|
||||
pRsp->bodyLen = tlen;
|
||||
return pRsp;
|
||||
}
|
||||
|
||||
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
char *batchReqStr = pReq->rpcMsg.pCont;
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
char *batchReqStr = pReq->rpcMsg.pCont;
|
||||
SClientHbBatchReq batchReq = {0};
|
||||
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
|
||||
SArray *pArray = batchReq.reqs;
|
||||
int sz = taosArrayGetSize(pArray);
|
||||
int sz = taosArrayGetSize(pArray);
|
||||
|
||||
SClientHbBatchRsp batchRsp = {0};
|
||||
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
|
||||
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SClientHbReq *pHbReq = taosArrayGet(pArray, i);
|
||||
SClientHbReq* pHbReq = taosArrayGet(pArray, i);
|
||||
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
|
||||
|
||||
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
|
||||
SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL};
|
||||
taosArrayPush(batchRsp.rsps, &rsp);
|
||||
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
|
||||
if (pRsp != NULL) {
|
||||
taosArrayPush(batchRsp.rsps, pRsp);
|
||||
free(pRsp);
|
||||
}
|
||||
}
|
||||
}
|
||||
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
void *bufCopy = buf;
|
||||
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp);
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
void* abuf = buf;
|
||||
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
|
||||
pReq->contLen = tlen;
|
||||
pReq->pCont = buf;
|
||||
return 0;
|
||||
|
|
|
@ -74,6 +74,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
|
||||
pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char));
|
||||
if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER;
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||
|
@ -83,6 +92,12 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
TOPIC_ENCODE_OVER:
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
|
||||
/*if (pTopic->logicalPlan) {*/
|
||||
/*free(pTopic->logicalPlan);*/
|
||||
/*}*/
|
||||
/*if (pTopic->physicalPlan) {*/
|
||||
/*free(pTopic->physicalPlan);*/
|
||||
/*}*/
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -121,10 +136,23 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||
// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
|
||||
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||
// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||
pTopic->logicalPlan = calloc(len+1, sizeof(char));
|
||||
if (pTopic->logicalPlan == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
|
||||
if (pTopic->physicalPlan == NULL) {
|
||||
free(pTopic->logicalPlan);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
|
||||
|
||||
|
|
|
@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
|
|||
SClientHbBatchRsp rsp = {0};
|
||||
tDeserializeSClientHbBatchRsp(pRspChar, &rsp);
|
||||
int sz = taosArrayGetSize(rsp.rsps);
|
||||
ASSERT_EQ(sz, 1);
|
||||
SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
|
||||
EXPECT_EQ(pRsp->connKey.connId, 123);
|
||||
EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
|
||||
EXPECT_EQ(pRsp->status, 0);
|
||||
ASSERT_EQ(sz, 0);
|
||||
//SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
|
||||
//EXPECT_EQ(pRsp->connKey.connId, 123);
|
||||
//EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
|
||||
//EXPECT_EQ(pRsp->status, 0);
|
||||
|
||||
#if 0
|
||||
int32_t contLen = sizeof(SHeartBeatReq);
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "meta.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -314,6 +315,26 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
|
|||
|
||||
static int tqQueryExecuting(int32_t status) { return status; }
|
||||
|
||||
typedef struct STqReadHandle {
|
||||
int64_t ver;
|
||||
SSubmitMsg* pMsg;
|
||||
SSubmitBlk* pBlock;
|
||||
SSubmitMsgIter msgIter;
|
||||
SSubmitBlkIter blkIter;
|
||||
SMeta* pMeta;
|
||||
} STqReadHandle;
|
||||
|
||||
typedef struct SSubmitBlkScanInfo {
|
||||
|
||||
} SSubmitBlkScanInfo;
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg);
|
||||
bool tqNextDataBlock(STqReadHandle* pHandle);
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo);
|
||||
//return SArray<SColumnInfoData>
|
||||
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
|
||||
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -607,3 +607,70 @@ int tqItemSSize() {
|
|||
// mainly for executor
|
||||
return 0;
|
||||
}
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
|
||||
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
||||
if (pReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pReadHandle->pMeta = pMeta;
|
||||
pReadHandle->pMsg = pMsg;
|
||||
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
||||
pReadHandle->ver = -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||
if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
|
||||
SMemRow row;
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
|
||||
pBlockInfo->numOfCols = pSchema->nCols;
|
||||
pBlockInfo->rows = pHandle->pBlock->numOfRows;
|
||||
pBlockInfo->uid = pHandle->pBlock->uid;
|
||||
//TODO: filter out unused column
|
||||
return 0;
|
||||
}
|
||||
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
||||
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
||||
SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||
if (pArray == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
SColumnInfoData colInfo;
|
||||
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
|
||||
colInfo.pData = malloc(sz);
|
||||
if (colInfo.pData == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pTschema->numOfCols; i++) {
|
||||
//TODO: filter out unused column
|
||||
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
|
||||
}
|
||||
|
||||
SMemRow row;
|
||||
int32_t kvIdx;
|
||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
|
||||
//TODO: filter out unused column
|
||||
STColumn *pCol = schemaColAt(pTschema, i);
|
||||
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
|
||||
//TODO: handle varlen
|
||||
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
|
||||
}
|
||||
}
|
||||
taosArrayPush(pArray, &colInfo);
|
||||
return pArray;
|
||||
}
|
||||
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
|
||||
/*return 0;*/
|
||||
/*}*/
|
||||
|
|
|
@ -85,7 +85,6 @@ enum {
|
|||
typedef struct STableCheckInfo {
|
||||
uint64_t tableId;
|
||||
TSKEY lastKey;
|
||||
STable* pTableObj;
|
||||
SBlockInfo* pCompInfo;
|
||||
int32_t compSize;
|
||||
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
|
||||
|
@ -141,8 +140,6 @@ typedef struct STsdbReadHandle {
|
|||
STableBlockInfo* pDataBlockInfo;
|
||||
SDataCols *pDataCols; // in order to hold current file data block
|
||||
int32_t allocSize; // allocated data block size
|
||||
// STsdb
|
||||
// STsdbMemTable * pMemTable;
|
||||
SArray *defaultLoadColumn;// default load column
|
||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
||||
|
@ -204,8 +201,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
|
|||
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
||||
|
||||
// the primary timestamp column does not be included in the the specified load column list, add it
|
||||
if (loadTS && colId != 0) {
|
||||
int16_t columnId = 0;
|
||||
if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
taosArrayInsert(pLocalIdList, 0, &columnId);
|
||||
}
|
||||
|
||||
|
@ -292,7 +289,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
for (int32_t j = 0; j < gsize; ++j) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||
|
||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
|
||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
|
||||
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||
|
||||
|
@ -315,10 +312,9 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
|
||||
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
size_t gsize = taosArrayGetSize(pTableCheckInfo);
|
||||
for (int32_t i = 0; i < gsize; ++i) {
|
||||
STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
||||
taosArrayPush(pTable, &pInfo->pTableObj);
|
||||
}
|
||||
// for (int32_t i = 0; i < gsize; ++i) {
|
||||
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
||||
// }
|
||||
|
||||
*psTable = pTable;
|
||||
return pTableCheckInfo;
|
||||
|
@ -347,15 +343,11 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
|
|||
// only one table, not need to sort again
|
||||
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
|
||||
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
|
||||
SArray* pTable = taosArrayInit(1, sizeof(STable*));
|
||||
|
||||
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
|
||||
STableCheckInfo info = { .lastKey = skey};
|
||||
|
||||
info.tableId = pCheckInfo->tableId;
|
||||
taosArrayPush(pNew, &info);
|
||||
taosArrayPush(pTable, &pCheckInfo->pTableObj);
|
||||
|
||||
*psTable = pTable;
|
||||
return pNew;
|
||||
}
|
||||
|
||||
|
@ -461,9 +453,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
||||
}
|
||||
|
||||
// STsdbMeta* pMeta = NULL;//tsdbGetMeta(tsdb);
|
||||
// assert(pMeta != NULL);
|
||||
|
||||
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||
if (pReadHandle->pDataCols == NULL) {
|
||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
|
||||
|
@ -641,12 +630,6 @@ SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
|
|||
|
||||
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||
SArray* res = taosArrayInit(size, POINTER_BYTES);
|
||||
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
|
||||
taosArrayPush(res, &pCheckInfo->pTableObj);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -1049,7 +1032,10 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
|
|||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
|
||||
pCheckInfo->numOfBlocks = 0;
|
||||
|
||||
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
|
||||
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
|
||||
table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||
|
||||
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
@ -1149,7 +1135,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
|
|||
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
STSchema *pSchema = NULL;//tsdbGetTableSchema(pCheckInfo->pTableObj);
|
||||
STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||
|
@ -1184,7 +1170,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
|
||||
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
|
||||
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
|
||||
pBlockLoadInfo->uid = pCheckInfo->pTableObj->uid;
|
||||
pBlockLoadInfo->uid = pCheckInfo->tableId;
|
||||
|
||||
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
||||
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
|
||||
|
@ -1878,7 +1864,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
|
||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
||||
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
STable* pTable = NULL;
|
||||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
||||
|
@ -1932,7 +1918,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
rv2 = memRowVersion(row2);
|
||||
}
|
||||
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
|
@ -1958,7 +1944,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
}
|
||||
|
||||
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
|
@ -2745,7 +2731,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
|
|||
// if (ret != TSDB_CODE_SUCCESS) {
|
||||
// return false;
|
||||
// }
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->pTableObj, NULL, NULL, true);
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
|
||||
tfree(pRow);
|
||||
|
||||
// update the last key value
|
||||
|
@ -3389,14 +3375,14 @@ SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList)
|
|||
if (pHandle->cur.mixBlock) {
|
||||
return pHandle->pColumns;
|
||||
} else {
|
||||
SDataBlockInfo binfo = {0};/*GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);*/
|
||||
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
|
||||
assert(pHandle->realNumOfRows <= binfo.rows);
|
||||
|
||||
// data block has been loaded, todo extract method
|
||||
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
|
||||
|
||||
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
|
||||
pBlockLoadInfo->uid == pCheckInfo->pTableObj->tid) {
|
||||
pBlockLoadInfo->uid == pCheckInfo->tableId) {
|
||||
return pHandle->pColumns;
|
||||
} else { // only load the file block
|
||||
SBlock* pBlock = pBlockInfo->compBlock;
|
||||
|
|
|
@ -551,7 +551,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
|
|||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||
int numOfColIds) {
|
||||
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
||||
ASSERT(colIds[0] == 0);
|
||||
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
|
||||
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||
SBlockCol blockCol = {0};
|
||||
|
@ -588,7 +588,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
|||
if (pDataCol == NULL) continue;
|
||||
ASSERT(pDataCol->colId == colId);
|
||||
|
||||
if (colId == 0) { // load the key row
|
||||
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
|
||||
blockCol.colId = colId;
|
||||
blockCol.len = pBlock->keyLen;
|
||||
blockCol.type = pDataCol->type;
|
||||
|
|
|
@ -28,7 +28,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
case TDMT_VND_QUERY:
|
||||
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_QUERY_CONTINUE:
|
||||
return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
case TDMT_VND_SCHEDULE_DATA_SINK:
|
||||
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
|
||||
default:
|
||||
|
|
|
@ -47,6 +47,11 @@ enum {
|
|||
CTG_RENT_STABLE,
|
||||
};
|
||||
|
||||
typedef struct SCTGDebug {
|
||||
int32_t lockDebug;
|
||||
} SCTGDebug;
|
||||
|
||||
|
||||
typedef struct SVgroupListCache {
|
||||
int32_t vgroupVersion;
|
||||
SHashObj *cache; // key:vgId, value:SVgroupInfo
|
||||
|
@ -134,20 +139,22 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
|||
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
|
||||
|
||||
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||
|
||||
#define CTG_LOCK(type, _lock) do { \
|
||||
if (CTG_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
qDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
qDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
qDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
qDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -155,15 +162,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
|||
#define CTG_UNLOCK(type, _lock) do { \
|
||||
if (CTG_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
qDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
qDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
qDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
qDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
} \
|
||||
} while (0)
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
|
||||
SCatalogMgmt ctgMgmt = {0};
|
||||
|
||||
SCTGDebug gCTGDebug = {0};
|
||||
|
||||
|
||||
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
|
||||
if (NULL == pCatalog->dbCache.cache) {
|
||||
*inCache = false;
|
||||
|
|
|
@ -196,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
pOutput->bufStatus = updateStatus(pDispatcher);
|
||||
pthread_mutex_lock(&pDispatcher->mutex);
|
||||
pOutput->queryEnd = pDispatcher->queryEnd;
|
||||
pOutput->needSchedule = false;
|
||||
pOutput->scheduleJobNo = 0;
|
||||
pOutput->useconds = pDispatcher->useconds;
|
||||
pOutput->precision = pDispatcher->schema.precision;
|
||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||
|
|
|
@ -4927,6 +4927,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
|
||||
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
||||
if (NULL == pMsg) { // todo handle malloc error
|
||||
|
||||
}
|
||||
|
||||
SEpSet epSet;
|
||||
|
@ -7381,6 +7382,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
|
|||
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
|
||||
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||
cond.twindow = pTableScanNode->window;
|
||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||
|
||||
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
|
||||
|
|
|
@ -27,14 +27,30 @@ extern "C" {
|
|||
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
|
||||
enum {
|
||||
QW_READY_NOT_RECEIVED = 0,
|
||||
QW_READY_RECEIVED,
|
||||
QW_READY_RESPONSED,
|
||||
QW_PHASE_PRE_QUERY = 1,
|
||||
QW_PHASE_POST_QUERY,
|
||||
QW_PHASE_PRE_CQUERY,
|
||||
QW_PHASE_POST_CQUERY,
|
||||
QW_PHASE_PRE_SINK,
|
||||
QW_PHASE_POST_SINK,
|
||||
QW_PHASE_PRE_FETCH,
|
||||
QW_PHASE_POST_FETCH,
|
||||
};
|
||||
|
||||
enum {
|
||||
QW_TASK_INFO_STATUS = 1,
|
||||
QW_TASK_INFO_READY,
|
||||
QW_EVENT_CANCEL = 1,
|
||||
QW_EVENT_READY,
|
||||
QW_EVENT_FETCH,
|
||||
QW_EVENT_DROP,
|
||||
QW_EVENT_CQUERY,
|
||||
|
||||
QW_EVENT_MAX,
|
||||
};
|
||||
|
||||
enum {
|
||||
QW_EVENT_NOT_RECEIVED = 0,
|
||||
QW_EVENT_RECEIVED,
|
||||
QW_EVENT_PROCESSED,
|
||||
};
|
||||
|
||||
enum {
|
||||
|
@ -57,21 +73,45 @@ enum {
|
|||
QW_ADD_ACQUIRE,
|
||||
};
|
||||
|
||||
typedef struct SQWDebug {
|
||||
int32_t lockDebug;
|
||||
} SQWDebug;
|
||||
|
||||
typedef struct SQWMsg {
|
||||
void *node;
|
||||
char *msg;
|
||||
int32_t msgLen;
|
||||
void *connection;
|
||||
} SQWMsg;
|
||||
|
||||
typedef struct SQWPhaseInput {
|
||||
int8_t status;
|
||||
int32_t code;
|
||||
qTaskInfo_t taskHandle;
|
||||
DataSinkHandle sinkHandle;
|
||||
} SQWPhaseInput;
|
||||
|
||||
typedef struct SQWPhaseOutput {
|
||||
int32_t rspCode;
|
||||
bool needStop;
|
||||
bool needRsp;
|
||||
} SQWPhaseOutput;
|
||||
|
||||
|
||||
typedef struct SQWTaskStatus {
|
||||
SRWLatch lock;
|
||||
int32_t code;
|
||||
int8_t status;
|
||||
int8_t ready;
|
||||
bool cancel;
|
||||
bool drop;
|
||||
} SQWTaskStatus;
|
||||
|
||||
typedef struct SQWTaskCtx {
|
||||
SRWLatch lock;
|
||||
int8_t sinkScheduled;
|
||||
int8_t queryScheduled;
|
||||
int32_t phase;
|
||||
|
||||
int32_t sinkId;
|
||||
int32_t readyCode;
|
||||
|
||||
int8_t events[QW_EVENT_MAX];
|
||||
|
||||
bool needRsp;
|
||||
qTaskInfo_t taskHandle;
|
||||
DataSinkHandle sinkHandle;
|
||||
} SQWTaskCtx;
|
||||
|
@ -95,15 +135,22 @@ typedef struct SQWorkerMgmt {
|
|||
putReqToQueryQFp putToQueueFp;
|
||||
} SQWorkerMgmt;
|
||||
|
||||
#define QW_GOT_RES_DATA(data) (true)
|
||||
#define QW_LOW_RES_DATA(data) (false)
|
||||
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
|
||||
#define QW_IDS() sId, qId, tId
|
||||
#define QW_FPARAMS() mgmt, QW_IDS()
|
||||
|
||||
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
|
||||
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
|
||||
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
|
||||
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
|
||||
|
||||
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
|
||||
|
||||
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
|
||||
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
|
||||
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
|
||||
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
|
||||
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
|
||||
#define QW_IDS() sId, qId, tId
|
||||
|
||||
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
|
@ -123,21 +170,22 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
|
||||
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
|
||||
|
||||
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||
|
||||
#define QW_LOCK(type, _lock) do { \
|
||||
if (QW_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
qDebug("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
qDebug("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
qDebug("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
qDebug("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -145,25 +193,21 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_UNLOCK(type, _lock) do { \
|
||||
if (QW_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
|
||||
int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
|
||||
int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
|
||||
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_QWORKER_MSG_H_
|
||||
#define _TD_QWORKER_MSG_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "qworkerInt.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
|
||||
|
||||
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
|
||||
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
|
||||
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
|
||||
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
|
||||
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code);
|
||||
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code);
|
||||
void qwFreeFetchRsp(void *msg);
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_QWORKER_INT_H_*/
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,553 @@
|
|||
#include "qworker.h"
|
||||
#include <common.h>
|
||||
#include "executor.h"
|
||||
#include "planner.h"
|
||||
#include "query.h"
|
||||
#include "qworkerInt.h"
|
||||
#include "qworkerMsg.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
|
||||
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
|
||||
|
||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
|
||||
if (NULL == pRsp) {
|
||||
qError("rpcMallocCont %d failed", msgSize);
|
||||
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||
|
||||
*rsp = pRsp;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len) {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||
|
||||
rsp->useconds = htobe64(input->useconds);
|
||||
rsp->completed = input->queryEnd;
|
||||
rsp->precision = input->precision;
|
||||
rsp->compressed = input->compressed;
|
||||
rsp->compLen = htonl(len);
|
||||
rsp->numOfRows = htonl(input->numOfRows);
|
||||
}
|
||||
|
||||
|
||||
void qwFreeFetchRsp(void *msg) {
|
||||
if (msg) {
|
||||
rpcFreeCont(msg);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
|
||||
int32_t size = 0;
|
||||
|
||||
if (sStatus) {
|
||||
size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num;
|
||||
} else {
|
||||
size = sizeof(SSchedulerStatusRsp);
|
||||
}
|
||||
|
||||
SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size);
|
||||
|
||||
if (sStatus) {
|
||||
memcpy(pRsp, sStatus, size);
|
||||
} else {
|
||||
pRsp->num = 0;
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.msgType = pMsg->msgType + 1,
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = size,
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
|
||||
if (NULL == pRsp) {
|
||||
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||
dataLength = 0;
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp) + dataLength,
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
int32_t numOfCols = 6;
|
||||
int32_t msgSize = sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols;
|
||||
|
||||
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(msgSize);
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pRsp->metaInfo.pSchema;
|
||||
|
||||
const SSchema *s = tGetTbnameColumnSchema();
|
||||
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "name");
|
||||
pSchema++;
|
||||
|
||||
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "created");
|
||||
pSchema++;
|
||||
|
||||
type = TSDB_DATA_TYPE_SMALLINT;
|
||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "columns");
|
||||
pSchema++;
|
||||
|
||||
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "stable");
|
||||
pSchema++;
|
||||
|
||||
type = TSDB_DATA_TYPE_BIGINT;
|
||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "uid");
|
||||
pSchema++;
|
||||
|
||||
type = TSDB_DATA_TYPE_INT;
|
||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "vgId");
|
||||
|
||||
assert(cols == numOfCols);
|
||||
pRsp->metaInfo.numOfColumns = htonl(cols);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = msgSize,
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcMsg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
|
||||
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
|
||||
int32_t handle = htonl(pFetchReq->id);
|
||||
|
||||
pRsp->numOfRows = 0;
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcMsg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
|
||||
if (NULL == req) {
|
||||
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
req->header.vgId = mgmt->nodeId;
|
||||
req->sId = sId;
|
||||
req->queryId = qId;
|
||||
req->taskId = tId;
|
||||
|
||||
SRpcMsg pNewMsg = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
|
||||
.pCont = req,
|
||||
.contLen = sizeof(SSinkDataReq),
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qError("put data sink schedule msg to queue failed, code:%x", code);
|
||||
rpcFreeCont(req);
|
||||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
qDebug("put data sink schedule msg to query queue");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)connection;
|
||||
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
|
||||
if (NULL == req) {
|
||||
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
|
||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
req->header.vgId = mgmt->nodeId;
|
||||
req->sId = sId;
|
||||
req->queryId = qId;
|
||||
req->taskId = tId;
|
||||
|
||||
SRpcMsg pNewMsg = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.msgType = TDMT_VND_QUERY_CONTINUE,
|
||||
.pCont = req,
|
||||
.contLen = sizeof(SQueryContinueReq),
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
|
||||
rpcFreeCont(req);
|
||||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
msg->contentLen = ntohl(msg->contentLen);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg};
|
||||
|
||||
QW_SCH_TASK_DLOG("processQuery start, node:%p", node);
|
||||
|
||||
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
|
||||
|
||||
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int8_t status = 0;
|
||||
bool queryDone = false;
|
||||
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
|
||||
bool needStop = false;
|
||||
SQWTaskCtx *handles = NULL;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
|
||||
|
||||
QW_SCH_TASK_DLOG("processCQuery start, node:%p", node);
|
||||
|
||||
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
|
||||
|
||||
QW_SCH_TASK_DLOG("processCQuery end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SSinkDataReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid sink data msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
//dsScheduleProcess();
|
||||
//TODO
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SResReadyReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid task status msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
|
||||
|
||||
QW_SCH_TASK_DLOG("processReady start, node:%p", node);
|
||||
|
||||
QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg));
|
||||
|
||||
QW_SCH_TASK_DLOG("processReady end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSchTasksStatusReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid task status msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = htobe64(msg->sId);
|
||||
|
||||
SSchedulerStatusRsp *sStatus = NULL;
|
||||
|
||||
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
|
||||
|
||||
_return:
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SResFetchReq *msg = pMsg->pCont;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
|
||||
|
||||
QW_SCH_TASK_DLOG("processFetch start, node:%p", node);
|
||||
|
||||
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
|
||||
|
||||
QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STaskCancelReq *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
qError("invalid task cancel msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = htobe64(msg->sId);
|
||||
msg->queryId = htobe64(msg->queryId);
|
||||
msg->taskId = htobe64(msg->taskId);
|
||||
|
||||
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
|
||||
|
||||
_return:
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STaskDropReq *msg = pMsg->pCont;
|
||||
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
msg->sId = be64toh(msg->sId);
|
||||
msg->queryId = be64toh(msg->queryId);
|
||||
msg->taskId = be64toh(msg->taskId);
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
|
||||
|
||||
QW_SCH_TASK_DLOG("processDrop start, node:%p", node);
|
||||
|
||||
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
|
||||
|
||||
QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SVShowTablesReq *pReq = pMsg->pCont;
|
||||
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
||||
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
|
||||
}
|
||||
|
||||
|
|
@ -1374,6 +1374,83 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
||||
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
|
||||
if (1 != levelNum) {
|
||||
qError("invalid level num: %d", levelNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SArray *plans = taosArrayGet(pDag->pSubplans, 0);
|
||||
int32_t taskNum = taosArrayGetSize(plans);
|
||||
if (taskNum <= 0) {
|
||||
qError("invalid task num: %d", taskNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
|
||||
if (NULL == info) {
|
||||
qError("taosArrayInit %d taskInfo failed", taskNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
STaskInfo tInfo = {0};
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
SSubplan *plan = taosArrayGetP(plans, i);
|
||||
|
||||
tInfo.addr = plan->execNode;
|
||||
|
||||
code = qSubPlanToString(plan, &msg, &msgLen);
|
||||
if (TSDB_CODE_SUCCESS != code || NULL == msg || msgLen <= 0) {
|
||||
qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
|
||||
msg = calloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
qError("calloc %d failed", msgSize);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSubQueryMsg *pMsg = msg;
|
||||
|
||||
pMsg->header.vgId = htonl(tInfo.addr.nodeId);
|
||||
|
||||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(plan->id.queryId);
|
||||
pMsg->taskId = htobe64(atomic_add_fetch_64(&schMgmt.taskId, 1));
|
||||
pMsg->contentLen = htonl(msgLen);
|
||||
memcpy(pMsg->msg, msg, msgLen);
|
||||
|
||||
tInfo.msg = pMsg;
|
||||
|
||||
if (NULL == taosArrayPush(info, &tInfo)) {
|
||||
qError("taosArrayPush failed, idx:%d", i);
|
||||
free(msg);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
*pTasks = info;
|
||||
info = NULL;
|
||||
|
||||
_return:
|
||||
|
||||
schedulerFreeTaskList(info);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
|
||||
if (NULL == pJob || NULL == pData) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
|
@ -1521,6 +1598,20 @@ void scheduleFreeJob(void *job) {
|
|||
|
||||
qDebug("QID:%"PRIx64" job freed", queryId);
|
||||
}
|
||||
|
||||
void schedulerFreeTaskList(SArray *taskList) {
|
||||
if (NULL == taskList) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t taskNum = taosArrayGetSize(taskList);
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
STaskInfo *info = taosArrayGet(taskList, i);
|
||||
tfree(info->msg);
|
||||
}
|
||||
|
||||
taosArrayDestroy(taskList);
|
||||
}
|
||||
|
||||
void schedulerDestroy(void) {
|
||||
if (schMgmt.jobs) {
|
||||
|
|
|
@ -24,13 +24,49 @@ extern "C" {
|
|||
|
||||
extern int32_t rpcDebugFlag;
|
||||
|
||||
#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", rpcDebugFlag, __VA_ARGS__); }}
|
||||
#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", rpcDebugFlag, __VA_ARGS__); }}
|
||||
#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", rpcDebugFlag, __VA_ARGS__); }}
|
||||
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
|
||||
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }}
|
||||
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }}
|
||||
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }}
|
||||
// rpcDebugFlag = 143
|
||||
#define tFatal(...) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("RPC FATAL ", rpcDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tError(...) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("RPC ERROR ", rpcDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tWarn(...) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("RPC WARN ", rpcDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tInfo(...) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tDebug(...) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tTrace(...) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define tDump(x, y) \
|
||||
{ \
|
||||
if (rpcDebugFlag & DEBUG_DUMP) { \
|
||||
taosDumpData((unsigned char *)x, y); \
|
||||
} \
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -101,6 +101,13 @@ typedef struct SThreadObj {
|
|||
void* shandle;
|
||||
} SThreadObj;
|
||||
|
||||
typedef struct SClientObj {
|
||||
char label[TSDB_LABEL_LEN];
|
||||
int32_t index;
|
||||
int numOfThreads;
|
||||
SThreadObj** pThreadObj;
|
||||
} SClientObj;
|
||||
|
||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
|
||||
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
|
||||
|
@ -113,7 +120,7 @@ typedef struct SServerObj {
|
|||
uv_tcp_t server;
|
||||
uv_loop_t* loop;
|
||||
int workerIdx;
|
||||
int numOfThread;
|
||||
int numOfThreads;
|
||||
SThreadObj** pThreadObj;
|
||||
uv_pipe_t** pipe;
|
||||
uint32_t ip;
|
||||
|
@ -179,18 +186,37 @@ static void* acceptThread(void* arg);
|
|||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
|
||||
void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {taosInitServer, taosInitClient};
|
||||
|
||||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||
SClientObj* cli = calloc(1, sizeof(SClientObj));
|
||||
memcpy(cli->label, label, strlen(label));
|
||||
cli->numOfThreads = numOfThreads;
|
||||
cli->pThreadObj = (SThreadObj**)calloc(cli->numOfThreads, sizeof(SThreadObj*));
|
||||
|
||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
|
||||
|
||||
int err = pthread_create(&thrd->thread, NULL, workerThread, (void*)(thrd));
|
||||
if (err == 0) {
|
||||
tDebug("sucess to create tranport-client thread %d", i);
|
||||
}
|
||||
}
|
||||
return cli;
|
||||
}
|
||||
|
||||
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||
SServerObj* srv = calloc(1, sizeof(SServerObj));
|
||||
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||
srv->numOfThread = numOfThreads;
|
||||
srv->numOfThreads = numOfThreads;
|
||||
srv->workerIdx = 0;
|
||||
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
|
||||
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
|
||||
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThreads, sizeof(SThreadObj*));
|
||||
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
|
||||
srv->ip = ip;
|
||||
srv->port = port;
|
||||
uv_loop_init(srv->loop);
|
||||
|
||||
for (int i = 0; i < srv->numOfThread; i++) {
|
||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
|
||||
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
||||
int fds[2];
|
||||
|
@ -299,8 +325,7 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
|
|||
if (!rpcIsReq(pHead->msgType)) {
|
||||
// for response, if code is auth failure, it shall bypass the auth process
|
||||
code = htonl(pHead->code);
|
||||
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
|
||||
code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
|
||||
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
|
||||
code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
|
||||
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
|
||||
|
@ -460,7 +485,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
|||
|
||||
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||
|
||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
||||
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
|
||||
} else {
|
||||
|
@ -587,7 +612,9 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
||||
}
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
pRpc->connType = pInit->connType;
|
||||
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
// pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
return pRpc;
|
||||
}
|
||||
void rpcClose(void* arg) { return; }
|
||||
|
|
|
@ -1,8 +1,19 @@
|
|||
add_executable(transportTest "")
|
||||
add_executable(client "")
|
||||
add_executable(server "")
|
||||
|
||||
target_sources(transportTest
|
||||
PRIVATE
|
||||
"transportTests.cc"
|
||||
)
|
||||
target_sources (client
|
||||
PRIVATE
|
||||
"rclient.c"
|
||||
)
|
||||
target_sources (server
|
||||
PRIVATE
|
||||
"rserver.c"
|
||||
)
|
||||
|
||||
target_include_directories(transportTest
|
||||
PUBLIC
|
||||
|
@ -18,4 +29,30 @@ target_link_libraries (transportTest
|
|||
transport
|
||||
)
|
||||
|
||||
target_include_directories(client
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
target_link_libraries (client
|
||||
os
|
||||
util
|
||||
common
|
||||
gtest_main
|
||||
transport
|
||||
)
|
||||
target_include_directories(server
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
target_link_libraries (server
|
||||
os
|
||||
util
|
||||
common
|
||||
gtest_main
|
||||
transport
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "rpcLog.h"
|
||||
#include "taoserror.h"
|
||||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
#include "tutil.h"
|
||||
|
||||
typedef struct {
|
||||
int index;
|
||||
SEpSet epSet;
|
||||
int num;
|
||||
int numOfReqs;
|
||||
int msgSize;
|
||||
tsem_t rspSem;
|
||||
tsem_t * pOverSem;
|
||||
pthread_t thread;
|
||||
void * pRpc;
|
||||
} SInfo;
|
||||
|
||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
|
||||
|
||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
tsem_post(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
static int tcount = 0;
|
||||
|
||||
static void *sendRequest(void *param) {
|
||||
SInfo * pInfo = (SInfo *)param;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
tDebug("thread:%d, start to send request", pInfo->index);
|
||||
|
||||
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||
pInfo->num++;
|
||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||
rpcMsg.contLen = pInfo->msgSize;
|
||||
rpcMsg.ahandle = pInfo;
|
||||
rpcMsg.msgType = 1;
|
||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
tsem_wait(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
tDebug("thread:%d, it is over", pInfo->index);
|
||||
tcount++;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
SRpcInit rpcInit;
|
||||
SEpSet epSet;
|
||||
int msgSize = 128;
|
||||
int numOfReqs = 0;
|
||||
int appThreads = 1;
|
||||
char serverIp[40] = "127.0.0.1";
|
||||
char secret[20] = "mypassword";
|
||||
struct timeval systemTime;
|
||||
int64_t startTime, endTime;
|
||||
pthread_attr_t thattr;
|
||||
|
||||
// server info
|
||||
epSet.numOfEps = 1;
|
||||
epSet.inUse = 0;
|
||||
epSet.port[0] = 7000;
|
||||
epSet.port[1] = 7000;
|
||||
strcpy(epSet.fqdn[0], serverIp);
|
||||
strcpy(epSet.fqdn[1], "192.168.0.1");
|
||||
|
||||
// client info
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "APP";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processResponse;
|
||||
rpcInit.sessions = 100;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.user = "michael";
|
||||
rpcInit.secret = secret;
|
||||
rpcInit.ckey = "key";
|
||||
rpcInit.spi = 1;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
epSet.port[0] = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||
tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
|
||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||
msgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
||||
numOfReqs = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
||||
appThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||
rpcInit.user = argv[++i];
|
||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||
rpcInit.secret = argv[++i];
|
||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||
rpcInit.spi = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
|
||||
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
||||
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
taosInitLog("client.log", 100000, 10);
|
||||
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
tError("failed to initialize RPC");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tInfo("client is initialized");
|
||||
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
|
||||
|
||||
// gettimeofday(&systemTime, NULL);
|
||||
// startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i = 0; i < appThreads; ++i) {
|
||||
pInfo->index = i;
|
||||
pInfo->epSet = epSet;
|
||||
pInfo->numOfReqs = numOfReqs;
|
||||
pInfo->msgSize = msgSize;
|
||||
tsem_init(&pInfo->rspSem, 0, 0);
|
||||
pInfo->pRpc = pRpc;
|
||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||
pInfo++;
|
||||
}
|
||||
|
||||
do {
|
||||
usleep(1);
|
||||
} while (tcount < appThreads);
|
||||
|
||||
// gettimeofday(&systemTime, NULL);
|
||||
// endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds
|
||||
|
||||
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
||||
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||
|
||||
int ch = getchar();
|
||||
UNUSED(ch);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
//#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "rpcLog.h"
|
||||
#include "tglobal.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
|
||||
int msgSize = 128;
|
||||
int commit = 0;
|
||||
int dataFd = -1;
|
||||
STaosQueue *qhandle = NULL;
|
||||
STaosQset * qset = NULL;
|
||||
|
||||
void processShellMsg() {
|
||||
static int num = 0;
|
||||
STaosQall *qall;
|
||||
SRpcMsg * pRpcMsg, rpcMsg;
|
||||
int type;
|
||||
void * pvnode;
|
||||
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL);
|
||||
tDebug("%d shell msgs are received", numOfMsgs);
|
||||
if (numOfMsgs <= 0) break;
|
||||
|
||||
for (int i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||
|
||||
if (dataFd >= 0) {
|
||||
if (write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
|
||||
tInfo("failed to write data file, reason:%s", strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (commit >= 2) {
|
||||
num += numOfMsgs;
|
||||
// if (taosFsync(dataFd) < 0) {
|
||||
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
|
||||
//}
|
||||
|
||||
if (num % 10000 == 0) {
|
||||
tInfo("%d request have been written into disk", num);
|
||||
}
|
||||
}
|
||||
|
||||
taosResetQitems(qall);
|
||||
for (int i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
|
||||
memset(&rpcMsg, 0, sizeof(rpcMsg));
|
||||
rpcMsg.pCont = rpcMallocCont(msgSize);
|
||||
rpcMsg.contLen = msgSize;
|
||||
rpcMsg.handle = pRpcMsg->handle;
|
||||
rpcMsg.code = 0;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
|
||||
taosFreeQitem(pRpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
}
|
||||
|
||||
int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
|
||||
// app shall retrieve the auth info based on meterID from DB or a data file
|
||||
// demo code here only for simple demo
|
||||
int ret = 0;
|
||||
|
||||
if (strcmp(meterId, "michael") == 0) {
|
||||
*spi = 1;
|
||||
*encrypt = 0;
|
||||
strcpy(secret, "mypassword");
|
||||
strcpy(ckey, "key");
|
||||
} else if (strcmp(meterId, "jeff") == 0) {
|
||||
*spi = 0;
|
||||
*encrypt = 0;
|
||||
} else {
|
||||
ret = -1; // user not there
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SRpcMsg *pTemp;
|
||||
|
||||
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
|
||||
taosWriteQitem(qhandle, pTemp);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
SRpcInit rpcInit;
|
||||
char dataName[20] = "server.data";
|
||||
|
||||
taosBlockSIGPIPE();
|
||||
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 7000;
|
||||
rpcInit.label = "SER";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRequestMsg;
|
||||
rpcInit.sessions = 1000;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1500;
|
||||
rpcInit.afp = retrieveAuthInfo;
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
rpcInit.localPort = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||
msgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-w") == 0 && i < argc - 1) {
|
||||
commit = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
dDebugFlag = rpcDebugFlag;
|
||||
uDebugFlag = rpcDebugFlag;
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
|
||||
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
|
||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
tsAsyncLog = 0;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
taosInitLog("server.log", 100000, 10);
|
||||
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
tError("failed to start RPC server");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tInfo("RPC server is running, ctrl-c to exit");
|
||||
|
||||
if (commit) {
|
||||
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||
}
|
||||
|
||||
qhandle = taosOpenQueue();
|
||||
qset = taosOpenQset();
|
||||
taosAddIntoQset(qset, qhandle, NULL);
|
||||
|
||||
processShellMsg();
|
||||
|
||||
if (dataFd >= 0) {
|
||||
close(dataFd);
|
||||
remove(dataName);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -354,6 +354,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
|
||||
|
||||
|
||||
|
||||
// grant
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
|
||||
|
|
Loading…
Reference in New Issue