Merge remote-tracking branch 'origin/3.0' into feature/mnode
This commit is contained in:
commit
5690df9a07
|
@ -1241,6 +1241,12 @@ typedef struct {
|
|||
char data[];
|
||||
} SVShowTablesFetchRsp;
|
||||
|
||||
typedef struct SMqCMGetSubEpReq {
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqCMGetSubEpReq;
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
|
||||
|
@ -1562,7 +1568,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
|||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||
tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
|
||||
tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen);
|
||||
tlen += taosEncodeString(buf, (char*)pReq->qmsg);
|
||||
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
|
||||
return tlen;
|
||||
}
|
||||
|
@ -1577,7 +1583,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||||
buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
|
||||
buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen);
|
||||
buf = taosDecodeString(buf, (char**)&pReq->qmsg);
|
||||
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
|
||||
return buf;
|
||||
}
|
||||
|
@ -1639,6 +1645,92 @@ typedef struct SMqConsumeReq {
|
|||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
} SMqConsumeReq;
|
||||
|
||||
typedef struct SMqSubVgEp {
|
||||
int32_t vgId;
|
||||
SEpSet epSet;
|
||||
} SMqSubVgEp;
|
||||
|
||||
typedef struct SMqSubTopicEp {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
} SMqSubTopicEp;
|
||||
|
||||
typedef struct SMqCMGetSubEpRsp {
|
||||
int64_t consumerId;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
SArray* topics; // SArray<SMqSubTopicEp>
|
||||
} SMqCMGetSubEpRsp;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI16(buf, pVgEp->vgId);
|
||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||||
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
|
||||
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
||||
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
||||
if (pTopicEp->vgs == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubVgEp vgEp;
|
||||
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
||||
taosArrayPush(pTopicEp->vgs, &vgEp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
|
||||
tlen += taosEncodeString(buf, pRsp->cgroup);
|
||||
int32_t sz = taosArrayGetSize(pRsp->topics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
|
||||
tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pRsp->cgroup);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
||||
if (pRsp->topics == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubTopicEp topicEp;
|
||||
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
|
||||
taosArrayPush(pRsp->topics, &topicEp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -140,6 +140,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg)
|
||||
|
||||
// Requests handled by VNODE
|
||||
|
|
|
@ -45,7 +45,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
|
|||
* @param qId
|
||||
* @return
|
||||
*/
|
||||
int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
|
||||
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
|
||||
|
||||
/**
|
||||
* The main task execution function, including query on both table and multiple tables,
|
||||
|
|
|
@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
|
|||
|
||||
typedef struct SMqClientVg {
|
||||
// statistics
|
||||
int64_t consumeCnt;
|
||||
int64_t pollCnt;
|
||||
// offset
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
|
@ -345,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
|
|||
strcpy(pTmq->groupId, conf->groupId);
|
||||
pTmq->commit_cb = conf->commit_cb;
|
||||
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
|
||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||
return pTmq;
|
||||
}
|
||||
|
||||
|
@ -411,13 +412,12 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
tSerializeSCMSubscribeReq(&abuf, &req);
|
||||
/*printf("formatted: %s\n", dagStr);*/
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT);
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc sqlObj");
|
||||
}
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||
pRequest->type = TDMT_MND_SUBSCRIBE;
|
||||
|
||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
@ -596,6 +596,37 @@ struct tmq_message_t {
|
|||
SMqConsumeRsp rsp;
|
||||
};
|
||||
|
||||
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
tmq_t* tmq = (tmq_t*)param;
|
||||
SMqCMGetSubEpRsp rsp;
|
||||
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
|
||||
int32_t sz = taosArrayGetSize(rsp.topics);
|
||||
// TODO: lock
|
||||
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqClientTopic topic = {0};
|
||||
SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
|
||||
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
|
||||
for (int32_t j = 0; j < vgSz; j++) {
|
||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||
SMqClientVg clientVg = {
|
||||
.vgId = pVgEp->vgId,
|
||||
.epSet = pVgEp->epSet
|
||||
};
|
||||
taosArrayPush(topic.vgs, &clientVg);
|
||||
}
|
||||
taosArrayPush(tmq->clientTopics, &topic);
|
||||
}
|
||||
// unlock
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||
return NULL;
|
||||
|
@ -605,9 +636,38 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
req.reqType = 1;
|
||||
req.blockingTime = blocking_time;
|
||||
req.consumerId = tmq->consumerId;
|
||||
tmq_message_t* tmq_message = NULL;
|
||||
strcpy(req.cgroup, tmq->groupId);
|
||||
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
||||
if (taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||
SMqCMGetSubEpReq* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
tscError("failed to malloc get subscribe ep buf");
|
||||
}
|
||||
buf->consumerId = htobe64(buf->consumerId);
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc subscribe ep request");
|
||||
}
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = tmq;
|
||||
sendInfo->fp = tmq_ask_ep_cb;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
}
|
||||
|
||||
SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx);
|
||||
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
||||
strcpy(req.topic, pTopic->topicName);
|
||||
int32_t nextVgIdx = pTopic->nextVgIdx;
|
||||
|
@ -618,14 +678,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
|
||||
pRequest->type = TDMT_VND_CONSUME;
|
||||
|
||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = &tmq_message;
|
||||
sendInfo->fp = tmq_poll_cb_inner;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
return (tmq_message_t*)pRequest->body.resInfo.pData;
|
||||
return tmq_message;
|
||||
|
||||
/*tsem_wait(&pRequest->body.rspSem);*/
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
|||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
|
||||
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg;
|
||||
|
||||
// Requests handled by VNODE
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
|||
int32_t mndInitConsumer(SMnode *pMnode);
|
||||
void mndCleanupConsumer(SMnode *pMnode);
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId);
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
|
|
|
@ -30,24 +30,23 @@
|
|||
#define MND_CONSUMER_VER_NUMBER 1
|
||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_CONSUMER,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.keyType = SDB_KEY_INT64,
|
||||
.encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndConsumerActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
||||
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -61,10 +60,10 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
|||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void* buf = malloc(tlen);
|
||||
void *buf = malloc(tlen);
|
||||
if (buf == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void* abuf = buf;
|
||||
void *abuf = buf;
|
||||
tEncodeSMqConsumerObj(&abuf, pConsumer);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
|
@ -106,7 +105,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
int32_t len;
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
|
||||
void* buf = malloc(len);
|
||||
void *buf = malloc(len);
|
||||
if (buf == NULL) goto CM_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
||||
|
@ -147,7 +146,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
return 0;
|
||||
}
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) {
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
|
|
|
@ -30,6 +30,8 @@
|
|||
#define MND_SUBSCRIBE_VER_NUMBER 1
|
||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||
|
||||
static char *mndMakeSubscribeKey(char *cgroup, char *topicName);
|
||||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
||||
|
@ -41,9 +43,10 @@ static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
|
|||
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic);
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
|
||||
|
||||
int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
|
||||
|
@ -57,9 +60,60 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont;
|
||||
SMqCMGetSubEpRsp rsp;
|
||||
int64_t consumerId = be64toh(pReq->consumerId);
|
||||
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
/*terrno = */
|
||||
return -1;
|
||||
}
|
||||
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
|
||||
|
||||
strcpy(rsp.cgroup, pReq->cgroup);
|
||||
rsp.consumerId = consumerId;
|
||||
SArray *pTopics = pConsumer->topics;
|
||||
int32_t sz = taosArrayGetSize(pTopics);
|
||||
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubTopicEp topicEp;
|
||||
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i);
|
||||
strcpy(topicEp.topic, pConsumerTopic->name);
|
||||
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name);
|
||||
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
|
||||
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
|
||||
for (int32_t j = 0; j < assignedSz; j++) {
|
||||
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
|
||||
if (pCEp->consumerId == consumerId) {
|
||||
taosArrayPush(pSub->assigned, pCEp);
|
||||
}
|
||||
}
|
||||
if (taosArrayGetSize(topicEp.vgs) != 0) {
|
||||
taosArrayPush(rsp.topics, &topicEp);
|
||||
}
|
||||
}
|
||||
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
|
||||
void *buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
void *abuf = buf;
|
||||
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
|
||||
//TODO: free rsp
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
|
||||
int i = 0;
|
||||
while (key[i] != ':') {
|
||||
|
@ -97,7 +151,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
// build msg
|
||||
|
||||
SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen);
|
||||
SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -108,7 +162,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
pReq->logicalPlan = strdup(pTopic->logicalPlan);
|
||||
pReq->physicalPlan = strdup(pTopic->physicalPlan);
|
||||
pReq->qmsgLen = pCEp->qmsgLen;
|
||||
memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);
|
||||
/*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/
|
||||
pReq->qmsg = strdup(pCEp->qmsg);
|
||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
|
||||
void *reqStr = malloc(tlen);
|
||||
if (reqStr == NULL) {
|
||||
|
@ -146,11 +201,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
|
||||
//convert phyplan to dag
|
||||
// convert phyplan to dag
|
||||
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
|
||||
SArray *pArray;
|
||||
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
|
||||
SSubplan *plan = taosArrayGetP(inner, 0);
|
||||
SArray *pArray;
|
||||
SArray *inner = taosArrayGet(pDag->pSubplans, 0);
|
||||
SSubplan *plan = taosArrayGetP(inner, 0);
|
||||
plan->execNode.inUse = 0;
|
||||
strcpy(plan->execNode.epAddr[0].fqdn, "localhost");
|
||||
plan->execNode.epAddr[0].port = 6030;
|
||||
|
@ -161,21 +216,24 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
return -1;
|
||||
}
|
||||
int32_t sz = taosArrayGetSize(pArray);
|
||||
//convert dag to msg
|
||||
// convert dag to msg
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp CEp;
|
||||
CEp.status = 0;
|
||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
||||
STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
|
||||
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
||||
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
|
||||
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
|
||||
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
|
||||
CEp.vgId = pTaskInfo->addr.nodeId;
|
||||
CEp.qmsgLen = pTaskInfo->msg->contentLen;
|
||||
CEp.qmsg = malloc(CEp.qmsgLen);
|
||||
if (CEp.qmsg == NULL) {
|
||||
return -1;
|
||||
}
|
||||
memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);
|
||||
CEp.qmsg = strdup(pTaskInfo->msg->msg);
|
||||
CEp.qmsgLen = strlen(CEp.qmsg) + 1;
|
||||
printf("abc:\n%s\n", CEp.qmsg);
|
||||
/*CEp.qmsg = malloc(CEp.qmsgLen);*/
|
||||
/*if (CEp.qmsg == NULL) {*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
/*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/
|
||||
taosArrayPush(unassignedVg, &CEp);
|
||||
}
|
||||
|
||||
|
@ -184,7 +242,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
}
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) {
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) {
|
||||
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
|
||||
|
@ -199,6 +257,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
|||
req.sql = pTopic->sql;
|
||||
req.logicalPlan = pTopic->logicalPlan;
|
||||
req.physicalPlan = pTopic->physicalPlan;
|
||||
req.qmsg = strdup(pCEp->qmsg);
|
||||
req.qmsgLen = strlen(req.qmsg);
|
||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
||||
void *buf = malloc(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
|
@ -206,18 +266,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
|||
return -1;
|
||||
}
|
||||
|
||||
SMsgHead* pMsgHead = (SMsgHead*)buf;
|
||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||
|
||||
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
|
||||
pMsgHead->vgId = htonl(vgId);
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncodeSMqSetCVgReq(&abuf, &req);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = buf;
|
||||
action.contLen = tlen;
|
||||
action.contLen = sizeof(SMsgHead) + tlen;
|
||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
@ -285,7 +345,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t dataPos = 0;
|
||||
int32_t tlen;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
||||
void *buf = malloc(tlen + 1);
|
||||
void *buf = malloc(tlen + 1);
|
||||
if (buf == NULL) goto SUB_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
||||
|
@ -493,11 +553,11 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
char* key = mndMakeSubscribeKey(consumerGroup, newTopicName);
|
||||
char *key = mndMakeSubscribeKey(consumerGroup, newTopicName);
|
||||
strcpy(pSub->key, key);
|
||||
// set unassigned vg
|
||||
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
|
||||
//TODO: disable alter
|
||||
// TODO: disable alter
|
||||
}
|
||||
taosArrayPush(pSub->availConsumer, &consumerId);
|
||||
|
||||
|
@ -506,12 +566,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
|
||||
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
|
||||
ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1);
|
||||
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
|
||||
// send setmsg to vnode
|
||||
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
|
||||
// TODO
|
||||
return -1;
|
||||
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
|
||||
SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned);
|
||||
if (pCEp->vgId == vgId) {
|
||||
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) {
|
||||
// TODO
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// send setmsg to vnode
|
||||
}
|
||||
|
||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||
|
|
|
@ -104,8 +104,7 @@ typedef void* tsdbReadHandleT;
|
|||
* @param qinfo query info handle from query processor
|
||||
* @return
|
||||
*/
|
||||
tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
||||
void *pRef);
|
||||
tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId);
|
||||
|
||||
/**
|
||||
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
|
||||
|
|
|
@ -811,7 +811,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
|||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
pTopic->buffer.output[i].status = 0;
|
||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle);
|
||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle);
|
||||
}
|
||||
taosArrayPush(pConsumer->topics, pTopic);
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -132,7 +132,7 @@ typedef struct STsdbReadHandle {
|
|||
bool loadExternalRow; // load time window external data rows
|
||||
bool currentLoadExternalRows; // current load external rows
|
||||
int32_t loadType; // block load type
|
||||
uint64_t qId; // query info handle, for debug purpose
|
||||
char *idStr; // query info handle, for debug purpose
|
||||
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
|
||||
SDFileSet* pFileGroup;
|
||||
SFSIter fileIter;
|
||||
|
@ -306,7 +306,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
|||
}
|
||||
|
||||
taosArrayPush(pTableCheckInfo, &info);
|
||||
tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" 0x%"PRIx64, pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -389,13 +389,13 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond*
|
|||
}
|
||||
|
||||
if (updateTs) {
|
||||
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64
|
||||
", 0x%" PRIx64, pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
|
||||
pTsdbReadHandle->window.ekey, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
|
||||
pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
|
||||
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
}
|
||||
}
|
||||
|
||||
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, STsdbMemTable* pMemRef) {
|
||||
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) {
|
||||
STsdbReadHandle* pReadHandle = calloc(1, sizeof(STsdbReadHandle));
|
||||
if (pReadHandle == NULL) {
|
||||
goto _end;
|
||||
|
@ -408,7 +408,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
pReadHandle->cur.win = TSWINDOW_INITIALIZER;
|
||||
pReadHandle->checkFiles = true;
|
||||
pReadHandle->activeIndex = 0; // current active table index
|
||||
pReadHandle->qId = qId;
|
||||
pReadHandle->allocSize = 0;
|
||||
pReadHandle->locateStart = false;
|
||||
pReadHandle->loadType = pCond->type;
|
||||
|
@ -417,6 +416,10 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
pReadHandle->loadExternalRow = pCond->loadExternalRows;
|
||||
pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;
|
||||
|
||||
char buf[128] = {0};
|
||||
snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId);
|
||||
pReadHandle->idStr = strdup(buf);
|
||||
|
||||
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -455,7 +458,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
|
||||
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||
if (pReadHandle->pDataCols == NULL) {
|
||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
|
||||
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
@ -471,8 +474,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, void* pRef) {
|
||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef);
|
||||
tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) {
|
||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -492,9 +495,7 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// tsdbMayTakeMemSnapshot(pTsdbReadHandle, psTable);
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in query, 0x%"PRIx64, pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), pTsdbReadHandle->idStr);
|
||||
return (tsdbReadHandleT) pTsdbReadHandle;
|
||||
}
|
||||
|
||||
|
@ -575,7 +576,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond
|
|||
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
|
||||
}
|
||||
|
||||
tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
|
||||
tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
|
||||
pCond->twindow = updateLastrowForEachGroup(groupList);
|
||||
|
||||
// no qualified table
|
||||
|
@ -583,7 +584,7 @@ tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
|
||||
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -666,7 +667,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
|
|||
return pNew;
|
||||
}
|
||||
|
||||
tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pRef) {
|
||||
tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
|
||||
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
|
||||
|
||||
if (pNew->numOfTables == 0) {
|
||||
|
@ -681,7 +682,7 @@ tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond
|
|||
}
|
||||
}
|
||||
|
||||
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, pRef);
|
||||
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
|
||||
pTsdbReadHandle->loadExternalRow = true;
|
||||
pTsdbReadHandle->currentLoadExternalRows = true;
|
||||
|
||||
|
@ -734,8 +735,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
|
|||
SMemRow row = (SMemRow)SL_GET_NODE_DATA(node);
|
||||
TSKEY key = memRowKey(row); // first timestamp in buffer
|
||||
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64,
|
||||
pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->qId);
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
|
||||
pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr);
|
||||
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
assert(pCheckInfo->lastKey <= key);
|
||||
|
@ -744,7 +745,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
|
|||
}
|
||||
|
||||
} else {
|
||||
tsdbDebug("%p uid:%"PRId64", no data in mem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, pHandle->qId);
|
||||
tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
|
||||
}
|
||||
|
||||
if (!imemEmpty) {
|
||||
|
@ -754,8 +755,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
|
|||
SMemRow row = (SMemRow)SL_GET_NODE_DATA(node);
|
||||
TSKEY key = memRowKey(row); // first timestamp in buffer
|
||||
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64,
|
||||
pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->qId);
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
|
||||
pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr);
|
||||
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
assert(pCheckInfo->lastKey <= key);
|
||||
|
@ -763,7 +764,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
|
|||
assert(pCheckInfo->lastKey >= key);
|
||||
}
|
||||
} else {
|
||||
tsdbDebug("%p uid:%"PRId64", no data in imem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, pHandle->qId);
|
||||
tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -952,8 +953,8 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
|
|||
}
|
||||
|
||||
pCheckInfo->lastKey = memRowKey(row); // first timestamp in buffer
|
||||
tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, 0x%"PRIx64, pHandle,
|
||||
pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->qId);
|
||||
tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
|
||||
pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
|
||||
|
||||
// all data in mem are checked already.
|
||||
if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
|
||||
|
@ -1138,21 +1139,21 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||
tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1188,15 +1189,15 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
int64_t elapsedTime = (taosGetTimestampUs() - st);
|
||||
pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, 0x%"PRIx64,
|
||||
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s",
|
||||
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
pBlock->numOfRows = 0;
|
||||
|
||||
tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, 0x%"PRIx64,
|
||||
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->qId);
|
||||
tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s",
|
||||
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -1219,9 +1220,9 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
|||
key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
|
||||
|
||||
if (key != TSKEY_INITIAL_VAL) {
|
||||
tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pTsdbReadHandle, key, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
|
||||
} else {
|
||||
tsdbDebug("%p no data in mem, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
}
|
||||
|
||||
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
||||
|
@ -1288,11 +1289,11 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
|||
|
||||
assert(cur->blockCompleted);
|
||||
if (cur->rows == binfo.rows) {
|
||||
tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %"PRIx64,
|
||||
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s",
|
||||
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
|
||||
} else {
|
||||
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %"PRIx64,
|
||||
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s",
|
||||
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1815,8 +1816,8 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
|
|||
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
||||
doCheckGeneratedBlockRange(pTsdbReadHandle);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, 0x%"PRIx64,
|
||||
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
|
||||
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
|
||||
}
|
||||
|
||||
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
|
||||
|
@ -1868,9 +1869,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
||||
"end:%d, 0x%"PRIx64,
|
||||
"end:%d, %s",
|
||||
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey,
|
||||
blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->qId);
|
||||
blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr);
|
||||
|
||||
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
||||
int32_t numOfRows = 0;
|
||||
|
@ -2027,8 +2028,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
||||
doCheckGeneratedBlockRange(pTsdbReadHandle);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, 0x%"PRIx64,
|
||||
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
|
||||
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
|
||||
}
|
||||
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||
|
@ -2202,13 +2203,13 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
|
|||
memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
|
||||
cleanBlockOrderSupporter(&sup, numOfQualTables);
|
||||
|
||||
tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted 0x%"PRIx64, pTsdbReadHandle, cnt,
|
||||
pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
|
||||
pTsdbReadHandle->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables 0x%"PRIx64, pTsdbReadHandle, cnt,
|
||||
numOfQualTables, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
|
||||
numOfQualTables, pTsdbReadHandle->idStr);
|
||||
|
||||
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
|
||||
sup.numOfTables = numOfQualTables;
|
||||
|
@ -2244,7 +2245,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
|
|||
* }
|
||||
*/
|
||||
|
||||
tsdbDebug("%p %d data blocks sort completed, 0x%"PRIx64, pTsdbReadHandle, cnt, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
|
||||
cleanBlockOrderSupporter(&sup, numOfTables);
|
||||
free(pTree);
|
||||
|
||||
|
@ -2302,8 +2303,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
|
|||
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
|
||||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
|
||||
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pTsdbReadHandle,
|
||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
|
||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
pTsdbReadHandle->pFileGroup = NULL;
|
||||
assert(pTsdbReadHandle->numOfBlocks == 0);
|
||||
break;
|
||||
|
@ -2326,8 +2327,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
|
|||
break;
|
||||
}
|
||||
|
||||
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, 0x%"PRIx64, pTsdbReadHandle, numOfBlocks, numOfTables,
|
||||
pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
|
||||
pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
|
||||
|
||||
assert(numOfBlocks >= 0);
|
||||
if (numOfBlocks == 0) {
|
||||
|
@ -2420,8 +2421,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReadHandleT* queryHandle, STableBlockDist*
|
|||
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
|
||||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
|
||||
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pTsdbReadHandle,
|
||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
|
||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
pTsdbReadHandle->pFileGroup = NULL;
|
||||
break;
|
||||
}
|
||||
|
@ -2444,8 +2445,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReadHandleT* queryHandle, STableBlockDist*
|
|||
break;
|
||||
}
|
||||
|
||||
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, 0x%"PRIx64, pTsdbReadHandle, numOfBlocks, numOfTables,
|
||||
pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
|
||||
pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
|
||||
|
||||
if (numOfBlocks == 0) {
|
||||
continue;
|
||||
|
@ -2499,8 +2500,8 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
|
|||
if ((!cur->mixBlock) || cur->blockCompleted) {
|
||||
// all data blocks in current file has been checked already, try next file if exists
|
||||
} else {
|
||||
tsdbDebug("%p continue in current data block, index:%d, pos:%d, 0x%"PRIx64, pTsdbReadHandle, cur->slot, cur->pos,
|
||||
pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
|
||||
pTsdbReadHandle->idStr);
|
||||
int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
|
||||
*exists = (pTsdbReadHandle->realNumOfRows > 0);
|
||||
|
||||
|
@ -2624,8 +2625,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
}
|
||||
|
||||
int64_t elapsedTime = taosGetTimestampUs() - st;
|
||||
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, 0x%"PRIx64, pTsdbReadHandle,
|
||||
elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle,
|
||||
elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
@ -2937,7 +2938,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) {
|
|||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
|
||||
|
||||
if (emptyQueryTimewindow(pTsdbReadHandle)) {
|
||||
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -3052,7 +3053,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) {
|
|||
// memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
|
||||
// }
|
||||
//
|
||||
// pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->qId, pMemRef);
|
||||
// pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
|
||||
// tfree(cond.colList);
|
||||
//
|
||||
// // current table, only one table
|
||||
|
@ -3910,8 +3911,8 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
|
|||
|
||||
SIOCostSummary* pCost = &pTsdbReadHandle->cost;
|
||||
|
||||
tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, 0x%"PRIx64,
|
||||
pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->qId);
|
||||
tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s",
|
||||
pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
|
||||
|
||||
tfree(pTsdbReadHandle);
|
||||
}
|
||||
|
|
|
@ -73,8 +73,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
tb_uid_t uid;
|
||||
int32_t nCols;
|
||||
int32_t nTagCols;
|
||||
SSchemaWrapper *pSW;
|
||||
STableMetaRsp * pTbMetaMsg = NULL;
|
||||
SSchemaWrapper *pSW = NULL;
|
||||
STableMetaRsp *pTbMetaMsg = NULL;
|
||||
SSchema * pTagSchema;
|
||||
SRpcMsg rpcMsg;
|
||||
int msgLen = 0;
|
||||
|
@ -145,15 +145,22 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
_exit:
|
||||
|
||||
free(pSW->pSchema);
|
||||
free(pSW);
|
||||
free(pTbCfg->name);
|
||||
free(pTbCfg);
|
||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
free(pTbCfg->stbCfg.pTagSchema);
|
||||
} else if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
kvRowFree(pTbCfg->ctbCfg.pTag);
|
||||
if (pSW != NULL) {
|
||||
tfree(pSW->pSchema);
|
||||
tfree(pSW);
|
||||
}
|
||||
|
||||
if (pTbCfg) {
|
||||
tfree(pTbCfg->name);
|
||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
free(pTbCfg->stbCfg.pTagSchema);
|
||||
} else if (pTbCfg->type == META_SUPER_TABLE) {
|
||||
kvRowFree(pTbCfg->ctbCfg.pTag);
|
||||
}
|
||||
|
||||
tfree(pTbCfg);
|
||||
}
|
||||
|
||||
rpcMsg.handle = pMsg->handle;
|
||||
rpcMsg.ahandle = pMsg->ahandle;
|
||||
rpcMsg.pCont = pTbMetaMsg;
|
||||
|
|
|
@ -38,7 +38,7 @@
|
|||
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
|
||||
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
|
||||
|
||||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.queryId)
|
||||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.idstr)
|
||||
|
||||
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
||||
|
||||
|
|
|
@ -240,6 +240,7 @@ typedef struct STaskIdInfo {
|
|||
uint64_t subplanId;
|
||||
uint64_t templateId;
|
||||
uint64_t taskId; // this is a subplan id
|
||||
char *idstr;
|
||||
} STaskIdInfo;
|
||||
|
||||
typedef struct SExecTaskInfo {
|
||||
|
@ -377,8 +378,10 @@ typedef struct SExchangeInfo {
|
|||
SSDataBlock *pResult;
|
||||
int32_t current;
|
||||
uint64_t rowsOfCurrentSource;
|
||||
uint64_t bytes; // total load bytes from remote
|
||||
uint64_t totalRows;
|
||||
|
||||
uint64_t totalSize; // total load bytes from remote
|
||||
uint64_t totalRows; // total number of rows
|
||||
uint64_t totalElapsed;// total elapsed time
|
||||
} SExchangeInfo;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
|
@ -660,6 +663,6 @@ int32_t getMaximumIdleDurationSec();
|
|||
|
||||
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
|
||||
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle, uint64_t taskId);
|
||||
|
||||
#endif // TDENGINE_EXECUTORIMPL_H
|
||||
|
|
|
@ -50,11 +50,11 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
|
|||
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||
|
||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo));
|
||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, pTaskInfo->id.queryId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
|
||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||
} else {
|
||||
qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo));
|
||||
qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -81,7 +81,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
|||
}
|
||||
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL);
|
||||
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO: destroy SSubplan & pTaskInfo
|
||||
terrno = code;
|
||||
|
|
|
@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
|
|||
tfree(param->prevResult);
|
||||
}
|
||||
|
||||
int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||
int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||
assert(readHandle != NULL && pSubplan != NULL);
|
||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
|
||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle);
|
||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -83,8 +83,9 @@ int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTas
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
|
||||
if (handle) {
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
|
||||
}
|
||||
|
||||
_error:
|
||||
// if failed to add ref for all tables in this query, abort current query
|
||||
|
@ -140,16 +141,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
|
||||
// todo: remove it.
|
||||
if (tinfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pRes = NULL;
|
||||
|
||||
int64_t curOwner = 0;
|
||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||
qError("QID:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
||||
(void*)curOwner);
|
||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||
return pTaskInfo->code;
|
||||
|
@ -160,7 +155,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
qDebug("QID:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
|
||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -169,12 +164,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
publishQueryAbortEvent(pTaskInfo, ret);
|
||||
pTaskInfo->code = ret;
|
||||
qDebug("QID:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||
tstrerror(pTaskInfo->code));
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
|
||||
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
||||
|
||||
bool newgroup = false;
|
||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
|
@ -183,7 +178,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
st = taosGetTimestampUs();
|
||||
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
|
||||
|
||||
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
|
||||
uint64_t el = (taosGetTimestampUs() - st);
|
||||
pTaskInfo->cost.elapsedTime += el;
|
||||
|
||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (NULL == *pRes) {
|
||||
|
@ -193,59 +190,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
||||
pTaskInfo->totalRows += current;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
|
||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0);
|
||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
|
||||
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
|
||||
if (pQInfo == NULL) {
|
||||
qError("QInfo invalid qhandle");
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
*buildRes = false;
|
||||
if (IS_QUERY_KILLED(pQInfo)) {
|
||||
qDebug("QID:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code);
|
||||
return pQInfo->code;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (tsRetrieveBlockingModel) {
|
||||
pQInfo->rspContext = pRspContext;
|
||||
tsem_wait(&pQInfo->ready);
|
||||
*buildRes = true;
|
||||
code = pQInfo->code;
|
||||
} else {
|
||||
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||
|
||||
pthread_mutex_lock(&pQInfo->lock);
|
||||
|
||||
assert(pQInfo->rspContext == NULL);
|
||||
if (pQInfo->dataReady == QUERY_RESULT_READY) {
|
||||
*buildRes = true;
|
||||
qDebug("QID:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize,
|
||||
GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
|
||||
} else {
|
||||
*buildRes = false;
|
||||
qDebug("QID:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId);
|
||||
pQInfo->rspContext = pRspContext;
|
||||
assert(pQInfo->rspContext != NULL);
|
||||
}
|
||||
|
||||
code = pQInfo->code;
|
||||
pthread_mutex_unlock(&pQInfo->lock);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
|
||||
SQInfo* pQInfo = (SQInfo*) qinfo;
|
||||
assert(pQInfo != NULL);
|
||||
|
@ -260,7 +211,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
|
|||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("QID:0x%"PRIx64" execTask killed", pTaskInfo->id.queryId);
|
||||
qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
|
||||
setTaskKilled(pTaskInfo);
|
||||
|
||||
// Wait for the query executing thread being stopped/
|
||||
|
@ -279,7 +230,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
|||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("QID:0x%"PRIx64" query async killed", pTaskInfo->id.queryId);
|
||||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||
setTaskKilled(pTaskInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -297,142 +248,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
|||
|
||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
|
||||
qDebug("QID:0x%"PRIx64" execTask completed, numOfRows:%"PRId64, pTaskInfo->id.queryId, pTaskInfo->totalRows);
|
||||
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
|
||||
|
||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||
doDestroyTask(pTaskInfo);
|
||||
}
|
||||
|
||||
void* qOpenTaskMgmt(int32_t vgId) {
|
||||
const int32_t refreshHandleInterval = 30; // every 30 seconds, refresh handle pool
|
||||
|
||||
char cacheName[128] = {0};
|
||||
sprintf(cacheName, "qhandle_%d", vgId);
|
||||
|
||||
STaskMgmt* pTaskMgmt = calloc(1, sizeof(STaskMgmt));
|
||||
if (pTaskMgmt == NULL) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTaskMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshHandleInterval, true, freeqinfoFn, cacheName);
|
||||
pTaskMgmt->closed = false;
|
||||
pTaskMgmt->vgId = vgId;
|
||||
|
||||
pthread_mutex_init(&pTaskMgmt->lock, NULL);
|
||||
|
||||
qDebug("vgId:%d, open queryTaskMgmt success", vgId);
|
||||
return pTaskMgmt;
|
||||
}
|
||||
|
||||
void qTaskMgmtNotifyClosing(void* pQMgmt) {
|
||||
if (pQMgmt == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STaskMgmt* pQueryMgmt = pQMgmt;
|
||||
qInfo("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
|
||||
|
||||
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||
pQueryMgmt->closed = true;
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
|
||||
taosCacheRefresh(pQueryMgmt->qinfoPool, taskMgmtKillTaskFn, NULL);
|
||||
}
|
||||
|
||||
void qQueryMgmtReOpen(void *pQMgmt) {
|
||||
if (pQMgmt == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STaskMgmt *pQueryMgmt = pQMgmt;
|
||||
qInfo("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId);
|
||||
|
||||
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||
pQueryMgmt->closed = false;
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
}
|
||||
|
||||
void qCleanupTaskMgmt(void* pQMgmt) {
|
||||
if (pQMgmt == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STaskMgmt* pQueryMgmt = pQMgmt;
|
||||
int32_t vgId = pQueryMgmt->vgId;
|
||||
|
||||
assert(pQueryMgmt->closed);
|
||||
|
||||
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
|
||||
pQueryMgmt->qinfoPool = NULL;
|
||||
|
||||
taosCacheCleanup(pqinfoPool);
|
||||
pthread_mutex_destroy(&pQueryMgmt->lock);
|
||||
tfree(pQueryMgmt);
|
||||
|
||||
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
|
||||
}
|
||||
|
||||
void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
|
||||
if (pMgmt == NULL) {
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STaskMgmt *pQueryMgmt = pMgmt;
|
||||
if (pQueryMgmt->qinfoPool == NULL) {
|
||||
qError("QID:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo);
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||
if (pQueryMgmt->closed) {
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
qError("QID:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo);
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
return NULL;
|
||||
} else {
|
||||
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
|
||||
(getMaximumIdleDurationSec()*1000));
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
|
||||
return handle;
|
||||
}
|
||||
}
|
||||
|
||||
void** qAcquireTask(void* pMgmt, uint64_t _key) {
|
||||
STaskMgmt *pQueryMgmt = pMgmt;
|
||||
|
||||
if (pQueryMgmt->closed) {
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pQueryMgmt->qinfoPool == NULL) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &_key, sizeof(_key));
|
||||
if (handle == NULL || *handle == NULL) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
return NULL;
|
||||
} else {
|
||||
return handle;
|
||||
}
|
||||
}
|
||||
|
||||
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle) {
|
||||
STaskMgmt *pQueryMgmt = pMgmt;
|
||||
if (pQueryMgmt->qinfoPool == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
//kill by qid
|
||||
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
|
||||
|
@ -444,7 +265,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
|
|||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
|
||||
qWarn("%s be killed(no memory commit).", pQInfo->qId);
|
||||
setTaskKilled(pQInfo);
|
||||
|
||||
// wait query stop
|
||||
|
@ -461,4 +282,4 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
|
|||
return error;
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -2432,10 +2432,6 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
|
|||
}
|
||||
|
||||
bool isTaskKilled(SExecTaskInfo *pTaskInfo) {
|
||||
// if (IS_QUERY_KILLED(pTaskInfo)) {
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
|
||||
// abort current query execution.
|
||||
if (pTaskInfo->owner != 0 && ((taosGetTimestampSec() - pTaskInfo->cost.start/1000) > 10*getMaximumIdleDurationSec())
|
||||
|
@ -4441,9 +4437,9 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) {
|
|||
//
|
||||
// calculateOperatorProfResults(pQInfo);
|
||||
|
||||
qDebug("QID:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
|
||||
qDebug("%s :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
|
||||
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
|
||||
pTaskInfo->id.queryId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
|
||||
pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
|
||||
//
|
||||
//qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
|
||||
|
@ -4994,7 +4990,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
|||
pResultRowInfo->curPos = 0;
|
||||
}
|
||||
|
||||
qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||
qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||
}
|
||||
|
||||
|
@ -5005,7 +5001,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
|||
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
|
||||
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
|
||||
|
||||
qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||
qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||
|
||||
if (pResultRowInfo->size > 0) {
|
||||
|
@ -5150,6 +5146,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -5170,7 +5168,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
epSet.port[0] = pSource->addr.epAddr[0].port;
|
||||
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources);
|
||||
|
||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||
|
@ -5181,7 +5180,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
// send the fetch remote task result reques
|
||||
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
qError("QID:0x%" PRIx64 " prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
@ -5198,7 +5197,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
|
||||
SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp;
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||
pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows);
|
||||
|
||||
|
@ -5206,6 +5205,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
pExchangeInfo->current += 1;
|
||||
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
|
||||
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
} else {
|
||||
continue;
|
||||
|
@ -5232,21 +5236,24 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
pRes->info.numOfCols = pOperator->numOfOutput;
|
||||
pRes->info.rows = pRsp->numOfRows;
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
|
||||
pExchangeInfo->totalRows += pRsp->numOfRows;
|
||||
pExchangeInfo->bytes += pRsp->compLen;
|
||||
pExchangeInfo->totalSize += pRsp->compLen;
|
||||
pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows;
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->totalSize,
|
||||
pExchangeInfo->current + 1, totalSources);
|
||||
|
||||
pExchangeInfo->rowsOfCurrentSource = 0;
|
||||
pExchangeInfo->current += 1;
|
||||
} else {
|
||||
qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes);
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize);
|
||||
}
|
||||
|
||||
return pExchangeInfo->pResult;
|
||||
|
@ -5293,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
|
|||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "TSC";
|
||||
rpcInit.label = "EX";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRspMsg;
|
||||
rpcInit.sessions = tsMaxConnections;
|
||||
|
@ -7728,32 +7735,38 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
|
||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
|
||||
SExecTaskInfo* pTaskInfo = calloc(1, sizeof(SExecTaskInfo));
|
||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||
|
||||
pTaskInfo->cost.created = taosGetTimestampMs();
|
||||
pTaskInfo->id.queryId = queryId;
|
||||
pTaskInfo->id.taskId = taskId;
|
||||
|
||||
char* p = calloc(1, 128);
|
||||
snprintf(p, 128, "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, queryId);
|
||||
pTaskInfo->id.idstr = strdup(p);
|
||||
|
||||
return pTaskInfo;
|
||||
}
|
||||
|
||||
static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId);
|
||||
static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId);
|
||||
|
||||
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId) {
|
||||
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) {
|
||||
if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) {
|
||||
if (pPhyNode->info.type == OP_TableScan) {
|
||||
|
||||
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
|
||||
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
|
||||
|
||||
tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId);
|
||||
tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId);
|
||||
|
||||
return createTableScanOperatorInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo);
|
||||
} else if (pPhyNode->info.type == OP_DataBlocksOptScan) {
|
||||
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
|
||||
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
|
||||
|
||||
tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId);
|
||||
tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId);
|
||||
|
||||
return createDataBlocksOptScanInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
|
||||
} else if (pPhyNode->info.type == OP_Exchange) {
|
||||
|
@ -7771,13 +7784,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, readerHandle, queryId);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, readerHandle, queryId, taskId);
|
||||
return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readerHandle, uint64_t queryId) {
|
||||
static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) {
|
||||
STsdbQueryCond cond = {.loadExternalRows = false};
|
||||
|
||||
cond.order = pTableScanNode->scan.order;
|
||||
|
@ -7796,10 +7809,10 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S
|
|||
cond.colList[i].colId = pSchema->colId;
|
||||
}
|
||||
|
||||
return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, NULL);
|
||||
return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId);
|
||||
}
|
||||
|
||||
static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId) {
|
||||
static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId) {
|
||||
int32_t code = 0;
|
||||
STableGroupInfo groupInfo = {0};
|
||||
|
||||
|
@ -7826,28 +7839,28 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
|
|||
|
||||
if (groupInfo.numOfTables == 0) {
|
||||
code = 0;
|
||||
qDebug("no table qualified for query, reqId:0x%"PRIx64, queryId);
|
||||
qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return createDataReadHandle(pTableScanNode, &groupInfo, readerHandle, queryId);
|
||||
return createDataReadHandle(pTableScanNode, &groupInfo, readerHandle, queryId, taskId);
|
||||
|
||||
_error:
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle, uint64_t taskId) {
|
||||
uint64_t queryId = pPlan->id.queryId;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pTaskInfo = createExecTaskInfo(queryId);
|
||||
*pTaskInfo = createExecTaskInfo(queryId, taskId);
|
||||
if (*pTaskInfo == NULL) {
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _complete;
|
||||
}
|
||||
|
||||
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId);
|
||||
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId, taskId);
|
||||
if ((*pTaskInfo)->pRoot == NULL) {
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _complete;
|
||||
|
@ -8814,72 +8827,15 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
|
|||
}
|
||||
|
||||
void doDestroyTask(SExecTaskInfo *pTaskInfo) {
|
||||
qDebug("QID:0x%"PRIx64" start to free execTask", pTaskInfo->id.queryId);
|
||||
qDebug("%s start to free execTask", GET_TASKID(pTaskInfo));
|
||||
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo);
|
||||
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
|
||||
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
|
||||
|
||||
qDebug("QID:0x%"PRIx64" execTask is freed", pTaskInfo->id.queryId);
|
||||
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
|
||||
tfree(pTaskInfo);
|
||||
}
|
||||
|
||||
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) {
|
||||
// the remained number of retrieved rows, not the interpolated result
|
||||
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||
|
||||
// load data from file to msg buffer
|
||||
if (pQueryAttr->tsCompQuery) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0);
|
||||
FILE *f = *(FILE **)pColInfoData->pData; // TODO refactor
|
||||
|
||||
// make sure file exist
|
||||
if (f) {
|
||||
off_t s = lseek(fileno(f), 0, SEEK_END);
|
||||
assert(s == pRuntimeEnv->outputBuf->info.rows);
|
||||
|
||||
//qDebug("QInfo:0x%"PRIx64" ts comp data return, file:%p, size:%"PRId64, pQInfo->qId, f, (uint64_t)s);
|
||||
if (fseek(f, 0, SEEK_SET) >= 0) {
|
||||
size_t sz = fread(data, 1, s, f);
|
||||
if(sz < s) { // todo handle error
|
||||
//qError("fread(f:%p,%d) failed, rsize:%" PRId64 ", expect size:%" PRId64, f, fileno(f), (uint64_t)sz, (uint64_t)s);
|
||||
assert(0);
|
||||
}
|
||||
} else {
|
||||
UNUSED(s);
|
||||
//qError("fseek(f:%p,%d) failed, error:%s", f, fileno(f), strerror(errno));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
// dump error info
|
||||
if (s <= (sizeof(STSBufFileHeader) + sizeof(STSGroupBlockInfo) + 6 * sizeof(int32_t))) {
|
||||
// qDump(data, s);
|
||||
assert(0);
|
||||
}
|
||||
|
||||
fclose(f);
|
||||
*(FILE **)pColInfoData->pData = NULL;
|
||||
}
|
||||
|
||||
// all data returned, set query over
|
||||
if (Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)) {
|
||||
// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
|
||||
}
|
||||
} else {
|
||||
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen);
|
||||
}
|
||||
|
||||
//qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId,
|
||||
// pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total);
|
||||
|
||||
if (pQueryAttr->limit.limit > 0 && pQueryAttr->limit.limit == pRuntimeEnv->resultInfo.total) {
|
||||
//qDebug("QInfo:0x%"PRIx64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQueryAttr->limit.limit);
|
||||
// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool doBuildResCheck(SQInfo* pQInfo) {
|
||||
bool buildRes = false;
|
||||
|
||||
|
|
|
@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) {
|
|||
|
||||
SExecTaskInfo* pTaskInfo = nullptr;
|
||||
DataSinkHandle sinkHandle = nullptr;
|
||||
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle);
|
||||
int32_t code = qCreateExecTask((void*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
|
@ -173,17 +173,17 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
|
||||
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
|
||||
|
||||
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
|
||||
|
||||
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
|
||||
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
|
||||
|
||||
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
||||
|
||||
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
|
|||
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
|
||||
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
|
||||
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
|
||||
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
|
||||
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code);
|
||||
|
|
|
@ -456,7 +456,7 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
|
||||
int32_t code = 0;
|
||||
bool qcontinue = true;
|
||||
SSDataBlock* pRes = NULL;
|
||||
|
@ -467,11 +467,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||
|
||||
while (true) {
|
||||
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
|
||||
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
|
||||
|
||||
code = qExecTask(*taskHandle, &pRes, &useconds);
|
||||
if (code) {
|
||||
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
||||
QW_TASK_ELOG("qExecTask failed, code:%s", tstrerror(code));
|
||||
QW_ERR_JRET(code);
|
||||
}
|
||||
|
||||
|
@ -485,6 +485,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
if (TASK_TYPE_TEMP == ctx->taskType) {
|
||||
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
|
||||
}
|
||||
|
||||
if (queryEnd) {
|
||||
*queryEnd = true;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -587,12 +591,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
|||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
queryEnd = pOutput->queryEnd;
|
||||
pOutput->queryEnd = false;
|
||||
|
||||
if (DS_BUF_EMPTY == pOutput->bufStatus && queryEnd) {
|
||||
pOutput->queryEnd = true;
|
||||
|
||||
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
|
||||
QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED);
|
||||
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
|
||||
}
|
||||
|
@ -974,7 +973,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
|
|||
QW_ERR_JRET(code);
|
||||
}
|
||||
|
||||
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
|
||||
code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
|
||||
if (code) {
|
||||
QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code));
|
||||
QW_ERR_JRET(code);
|
||||
|
@ -996,7 +995,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
|
|||
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
|
||||
|
||||
if (pTaskInfo && sinkHandle) {
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx));
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -1083,6 +1082,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
SQWPhaseOutput output = {0};
|
||||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
bool queryEnd = false;
|
||||
|
||||
do {
|
||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
|
||||
|
@ -1102,7 +1102,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx));
|
||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
|
||||
|
||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||
SOutputData sOutput = {0};
|
||||
|
@ -1114,13 +1114,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
// RC WARNING
|
||||
atomic_store_8(&ctx->queryContinue, 1);
|
||||
}
|
||||
|
||||
if (sOutput.queryEnd) {
|
||||
needStop = true;
|
||||
}
|
||||
|
||||
if (rsp) {
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen);
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
|
@ -1131,6 +1128,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
if (queryEnd) {
|
||||
needStop = true;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (NULL == ctx) {
|
||||
|
@ -1196,7 +1197,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
if (NULL == rsp) {
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
|
||||
} else {
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen);
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
}
|
||||
|
||||
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
||||
|
|
|
@ -26,11 +26,11 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len) {
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||
|
||||
rsp->useconds = htobe64(input->useconds);
|
||||
rsp->completed = input->queryEnd;
|
||||
rsp->completed = qComplete;
|
||||
rsp->precision = input->precision;
|
||||
rsp->compressed = input->compressed;
|
||||
rsp->compLen = htonl(len);
|
||||
|
@ -258,12 +258,12 @@ int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId,
|
|||
|
||||
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
|
||||
QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
|
||||
rpcFreeCont(req);
|
||||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
|
||||
QW_SCH_TASK_DLOG("put task continue exec msg to query queue, vgId:%d", mgmt->nodeId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -1482,13 +1482,14 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
|||
}
|
||||
|
||||
int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
|
||||
msg = calloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
qError("calloc %d failed", msgSize);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
|
||||
SSubQueryMsg* pMsg = calloc(1, msgSize);
|
||||
/*SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;*/
|
||||
memcpy(pMsg->msg, msg, msgLen);
|
||||
|
||||
pMsg->header.vgId = tInfo.addr.nodeId;
|
||||
|
||||
|
@ -1497,7 +1498,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
|||
pMsg->taskId = schGenUUID();
|
||||
pMsg->taskType = TASK_TYPE_PERSISTENT;
|
||||
pMsg->contentLen = msgLen;
|
||||
memcpy(pMsg->msg, msg, msgLen);
|
||||
/*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
|
||||
|
||||
tInfo.msg = pMsg;
|
||||
|
||||
|
|
|
@ -1360,9 +1360,14 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
|
|||
tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
|
||||
pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
||||
} else {
|
||||
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
|
||||
tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
|
||||
pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
||||
if (pHead->code == 0) {
|
||||
pConn->secured = 1; // for success response, set link as secured
|
||||
}
|
||||
|
||||
char ipport[40] = {0};
|
||||
taosIpPort2String(pConn->peerIp, pConn->peerPort, ipport);
|
||||
tDebug("%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
|
||||
ipport, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
||||
}
|
||||
|
||||
// tTrace("connection type is: %d", pConn->connType);
|
||||
|
|
|
@ -13,12 +13,12 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
|
|||
|
||||
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
||||
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov)
|
||||
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread)
|
||||
|
||||
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp)
|
||||
LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp)
|
||||
ADD_EXECUTABLE(hashTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov)
|
||||
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread)
|
||||
|
||||
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
||||
ADD_EXECUTABLE(trefTest ${BIN_SRC})
|
||||
|
|
Loading…
Reference in New Issue