commit
f229242137
|
@ -32,7 +32,8 @@ enum {
|
|||
};
|
||||
|
||||
enum {
|
||||
TMQ_MSG_TYPE__POLL_RSP = 0,
|
||||
TMQ_MSG_TYPE__DUMMY = 0,
|
||||
TMQ_MSG_TYPE__POLL_RSP,
|
||||
TMQ_MSG_TYPE__EP_RSP,
|
||||
};
|
||||
|
||||
|
@ -267,4 +268,4 @@ typedef struct SSessionWindow {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_COMMON_DEF_H_*/
|
||||
#endif /*_TD_COMMON_DEF_H_*/
|
||||
|
|
|
@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
|
|||
|
||||
typedef enum {
|
||||
HEARTBEAT_TYPE_MQ = 0,
|
||||
HEARTBEAT_TYPE_QUERY = 1,
|
||||
HEARTBEAT_TYPE_QUERY,
|
||||
// types can be added here
|
||||
//
|
||||
HEARTBEAT_TYPE_MAX
|
||||
|
|
|
@ -47,12 +47,12 @@ extern "C" {
|
|||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
typedef struct SHbConnInfo {
|
||||
typedef struct {
|
||||
void* param;
|
||||
SClientHbReq* req;
|
||||
} SHbConnInfo;
|
||||
|
||||
typedef struct SAppHbMgr {
|
||||
typedef struct {
|
||||
char* key;
|
||||
// statistics
|
||||
int32_t reportCnt;
|
||||
|
@ -68,11 +68,11 @@ typedef struct SAppHbMgr {
|
|||
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
|
||||
} SAppHbMgr;
|
||||
|
||||
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
|
||||
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
|
||||
|
||||
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
|
||||
|
||||
typedef struct SClientHbMgr {
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
// ctl
|
||||
int8_t threadStop;
|
||||
|
@ -108,13 +108,13 @@ typedef struct SHeartBeatInfo {
|
|||
} SHeartBeatInfo;
|
||||
|
||||
struct SAppInstInfo {
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
SInstanceSummary summary;
|
||||
SList* pConnList; // STscObj linked list
|
||||
int64_t clusterId;
|
||||
void* pTransporter;
|
||||
struct SAppHbMgr* pAppHbMgr;
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
SInstanceSummary summary;
|
||||
SList* pConnList; // STscObj linked list
|
||||
int64_t clusterId;
|
||||
void* pTransporter;
|
||||
SAppHbMgr* pAppHbMgr;
|
||||
};
|
||||
|
||||
typedef struct SAppInfo {
|
||||
|
@ -141,10 +141,6 @@ typedef struct STscObj {
|
|||
SAppInstInfo* pAppInfo;
|
||||
} STscObj;
|
||||
|
||||
typedef struct SMqConsumer {
|
||||
STscObj* pTscObj;
|
||||
} SMqConsumer;
|
||||
|
||||
typedef struct SReqResultInfo {
|
||||
const char* pRspMsg;
|
||||
const char* pData;
|
||||
|
@ -172,7 +168,7 @@ typedef struct SRequestSendRecvBody {
|
|||
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
|
||||
SDataBuf requestMsg;
|
||||
int64_t queryJob; // query job, created according to sql query DAG.
|
||||
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
|
||||
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
|
||||
SReqResultInfo resInfo;
|
||||
} SRequestSendRecvBody;
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ static SClientHbMgr clientHbMgr = {0};
|
|||
static int32_t hbCreateThread();
|
||||
static void hbStopThread();
|
||||
|
||||
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
|
||||
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
|
||||
|
||||
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t code = 0;
|
||||
|
@ -104,7 +104,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == info) {
|
||||
tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
|
||||
|
@ -163,7 +163,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRs
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
static int32_t emptyRspNum = 0;
|
||||
if (code != 0) {
|
||||
tfree(param);
|
||||
|
@ -226,7 +226,11 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
db->vgVersion = htonl(db->vgVersion);
|
||||
}
|
||||
|
||||
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
|
||||
SKv kv = {
|
||||
.key = HEARTBEAT_KEY_DBINFO,
|
||||
.valueLen = sizeof(SDbVgVersion) * dbNum,
|
||||
.value = dbs,
|
||||
};
|
||||
|
||||
tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
|
||||
|
||||
|
@ -256,7 +260,11 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
|
|||
stb->tversion = htons(stb->tversion);
|
||||
}
|
||||
|
||||
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
|
||||
SKv kv = {
|
||||
.key = HEARTBEAT_KEY_STBINFO,
|
||||
.valueLen = sizeof(SSTableMetaVersion) * stbNum,
|
||||
.value = stbs,
|
||||
};
|
||||
|
||||
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
|
||||
|
||||
|
@ -288,7 +296,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {}
|
||||
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
|
||||
|
||||
void hbMgrInitMqHbHandle() {
|
||||
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
|
||||
|
@ -396,7 +404,7 @@ static void *hbThreadFunc(void *param) {
|
|||
free(buf);
|
||||
break;
|
||||
}
|
||||
pInfo->fp = hbMqAsyncCallBack;
|
||||
pInfo->fp = hbAsyncCallBack;
|
||||
pInfo->msgInfo.pData = buf;
|
||||
pInfo->msgInfo.len = tlen;
|
||||
pInfo->msgType = TDMT_MND_HEARTBEAT;
|
||||
|
@ -448,7 +456,6 @@ static void hbStopThread() {
|
|||
}
|
||||
|
||||
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
|
||||
/*return NULL;*/
|
||||
hbMgrInit();
|
||||
SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr));
|
||||
if (pAppHbMgr == NULL) {
|
||||
|
@ -507,7 +514,6 @@ void appHbMgrCleanup(void) {
|
|||
}
|
||||
|
||||
int hbMgrInit() {
|
||||
/*return 0;*/
|
||||
// init once
|
||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
@ -525,7 +531,7 @@ int hbMgrInit() {
|
|||
}
|
||||
|
||||
void hbMgrCleanUp() {
|
||||
return;
|
||||
#if 0
|
||||
hbStopThread();
|
||||
|
||||
// destroy all appHbMgr
|
||||
|
@ -538,6 +544,7 @@ void hbMgrCleanUp() {
|
|||
pthread_mutex_unlock(&clientHbMgr.lock);
|
||||
|
||||
clientHbMgr.appHbMgrs = NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
|
||||
|
@ -564,9 +571,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
|
|||
}
|
||||
|
||||
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
|
||||
/*return 0;*/
|
||||
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
|
||||
SHbConnInfo info = {0};
|
||||
SClientHbKey connKey = {
|
||||
.connId = connId,
|
||||
.hbType = HEARTBEAT_TYPE_QUERY,
|
||||
};
|
||||
SHbConnInfo info = {0};
|
||||
|
||||
switch (hbType) {
|
||||
case HEARTBEAT_TYPE_QUERY: {
|
||||
|
@ -587,7 +596,6 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
|
|||
}
|
||||
|
||||
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
||||
/*return;*/
|
||||
int32_t code = 0;
|
||||
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||
|
@ -599,7 +607,6 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
|||
|
||||
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
|
||||
int32_t valueLen) {
|
||||
return 0;
|
||||
// find req by connection id
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
ASSERT(pReq != NULL);
|
||||
|
|
|
@ -59,6 +59,7 @@ struct tmq_t {
|
|||
char groupId[256];
|
||||
char clientId[256];
|
||||
int8_t autoCommit;
|
||||
int8_t inWaiting;
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
int32_t resetOffsetCfg;
|
||||
|
@ -66,9 +67,12 @@ struct tmq_t {
|
|||
STscObj* pTscObj;
|
||||
tmq_commit_cb* commit_cb;
|
||||
int32_t nextTopicIdx;
|
||||
int32_t waitingRequest;
|
||||
int32_t readyRequest;
|
||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||
STaosQueue* mqueue; // queue of tmq_message_t
|
||||
STaosQall* qall;
|
||||
tsem_t rspSem;
|
||||
// stat
|
||||
int64_t pollCnt;
|
||||
};
|
||||
|
@ -117,10 +121,12 @@ typedef struct {
|
|||
} SMqAskEpCbParam;
|
||||
|
||||
typedef struct {
|
||||
tmq_t* tmq;
|
||||
SMqClientVg* pVg;
|
||||
int32_t epoch;
|
||||
tsem_t rspSem;
|
||||
tmq_t* tmq;
|
||||
SMqClientVg* pVg;
|
||||
int32_t epoch;
|
||||
tsem_t rspSem;
|
||||
tmq_message_t** msg;
|
||||
int32_t sync;
|
||||
} SMqPollCbParam;
|
||||
|
||||
typedef struct {
|
||||
|
@ -205,16 +211,11 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
||||
tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
|
||||
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
|
||||
if (pParam->tmq->commit_cb) {
|
||||
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
|
||||
}
|
||||
if (!pParam->async)
|
||||
tsem_post(&pParam->rspSem);
|
||||
else {
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
free(param);
|
||||
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
|
||||
}
|
||||
if (!pParam->async) tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -240,9 +241,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
|||
return NULL;
|
||||
}
|
||||
pTmq->pTscObj = (STscObj*)conn;
|
||||
pTmq->inWaiting = 0;
|
||||
pTmq->status = 0;
|
||||
pTmq->pollCnt = 0;
|
||||
pTmq->epoch = 0;
|
||||
pTmq->waitingRequest = 0;
|
||||
pTmq->readyRequest = 0;
|
||||
// set conf
|
||||
strcpy(pTmq->clientId, conf->clientId);
|
||||
strcpy(pTmq->groupId, conf->groupId);
|
||||
|
@ -250,6 +254,8 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
|||
pTmq->commit_cb = conf->commit_cb;
|
||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||
|
||||
tsem_init(&pTmq->rspSem, 0, 0);
|
||||
|
||||
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
|
||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||
|
||||
|
@ -315,6 +321,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
|||
}
|
||||
pParam->tmq = tmq;
|
||||
tsem_init(&pParam->rspSem, 0, 0);
|
||||
pParam->async = async;
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = buf,
|
||||
|
@ -335,6 +342,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
|||
resp = pParam->rspErr;
|
||||
}
|
||||
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
free(pParam);
|
||||
|
||||
if (pArray) {
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
@ -576,7 +586,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
|
||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return 0;
|
||||
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
|
||||
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
|
||||
return pRsp->skipLogNum;
|
||||
}
|
||||
|
||||
|
@ -625,56 +635,74 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
|
|||
}
|
||||
|
||||
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
printf("recv poll\n");
|
||||
/*printf("recv poll\n");*/
|
||||
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
|
||||
SMqClientVg* pVg = pParam->pVg;
|
||||
tmq_t* tmq = pParam->tmq;
|
||||
if (code != 0) {
|
||||
printf("msg discard\n");
|
||||
if (pParam->epoch == tmq->epoch) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
return 0;
|
||||
goto WRITE_QUEUE_FAIL;
|
||||
}
|
||||
|
||||
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
|
||||
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
|
||||
if (msgEpoch < tmqEpoch) {
|
||||
tsem_post(&tmq->rspSem);
|
||||
printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (msgEpoch != tmqEpoch) {
|
||||
printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
|
||||
} else {
|
||||
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (pParam->sync == 1) {
|
||||
/**pParam->msg = malloc(sizeof(tmq_message_t));*/
|
||||
*pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
|
||||
if (*pParam->msg) {
|
||||
memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp));
|
||||
if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
|
||||
pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
|
||||
}
|
||||
taosWriteQitem(tmq->mqueue, *pParam->msg);
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
tsem_post(&pParam->rspSem);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
|
||||
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
|
||||
if (pRsp == NULL) {
|
||||
printf("fail\n");
|
||||
return -1;
|
||||
goto WRITE_QUEUE_FAIL;
|
||||
}
|
||||
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
|
||||
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||
if (pRsp->consumeRsp.numOfTopics == 0) {
|
||||
printf("no data\n");
|
||||
if (pParam->epoch == tmq->epoch) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
/*printf("no data\n");*/
|
||||
taosFreeQitem(pRsp);
|
||||
return 0;
|
||||
goto WRITE_QUEUE_FAIL;
|
||||
}
|
||||
|
||||
pRsp->extra = pParam->pVg;
|
||||
taosWriteQitem(tmq->mqueue, pRsp);
|
||||
printf("poll in queue\n");
|
||||
/*pParam->rspMsg = (tmq_message_t*)pRsp;*/
|
||||
/*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/
|
||||
|
||||
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
|
||||
/*printf("-----msg begin----\n");*/
|
||||
/*printf("\n-----msg end------\n");*/
|
||||
atomic_add_fetch_32(&tmq->readyRequest, 1);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return 0;
|
||||
|
||||
WRITE_QUEUE_FAIL:
|
||||
if (pParam->epoch == tmq->epoch) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
tsem_post(&tmq->rspSem);
|
||||
return code;
|
||||
}
|
||||
|
||||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||
|
@ -711,81 +739,94 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
tmq_t* tmq = pParam->tmq;
|
||||
if (code != 0) {
|
||||
printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
|
||||
if (pParam->sync) {
|
||||
tsem_post(&pParam->rspSem);
|
||||
}
|
||||
return 0;
|
||||
goto END;
|
||||
}
|
||||
tscDebug("tmq ask ep cb called");
|
||||
|
||||
// tmq's epoch is monotomically increase,
|
||||
// so it's safe to discard any old epoch msg.
|
||||
// epoch will only increase when received newer epoch ep msg
|
||||
SMqRspHead* head = pMsg->pData;
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
if (head->epoch <= epoch) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (pParam->sync) {
|
||||
SMqRspHead* head = pMsg->pData;
|
||||
SMqCMGetSubEpRsp rsp;
|
||||
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
||||
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
||||
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
||||
atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||
}
|
||||
tsem_post(&pParam->rspSem);
|
||||
tDeleteSMqCMGetSubEpRsp(&rsp);
|
||||
} else {
|
||||
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp);
|
||||
|
||||
taosWriteQitem(tmq->mqueue, pRsp);
|
||||
tsem_post(&tmq->rspSem);
|
||||
}
|
||||
return 0;
|
||||
|
||||
END:
|
||||
if (pParam->sync) {
|
||||
tsem_post(&pParam->rspSem);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||
printf("ask ep sync %d\n", sync);
|
||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||
SMqCMGetSubEpReq* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
SMqCMGetSubEpReq* req = malloc(tlen);
|
||||
if (req == NULL) {
|
||||
tscError("failed to malloc get subscribe ep buf");
|
||||
goto END;
|
||||
return -1;
|
||||
}
|
||||
buf->consumerId = htobe64(tmq->consumerId);
|
||||
buf->epoch = htonl(tmq->epoch);
|
||||
strcpy(buf->cgroup, tmq->groupId);
|
||||
|
||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc subscribe ep request");
|
||||
goto END;
|
||||
}
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = buf,
|
||||
.len = tlen,
|
||||
.handle = NULL,
|
||||
};
|
||||
req->consumerId = htobe64(tmq->consumerId);
|
||||
req->epoch = htonl(tmq->epoch);
|
||||
strcpy(req->cgroup, tmq->groupId);
|
||||
|
||||
SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam));
|
||||
if (pParam == NULL) {
|
||||
tscError("failed to malloc subscribe param");
|
||||
goto END;
|
||||
free(req);
|
||||
return -1;
|
||||
}
|
||||
pParam->tmq = tmq;
|
||||
pParam->sync = sync;
|
||||
tsem_init(&pParam->rspSem, 0, 0);
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
free(pParam);
|
||||
free(req);
|
||||
return -1;
|
||||
}
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = req,
|
||||
.len = tlen,
|
||||
.handle = NULL,
|
||||
};
|
||||
|
||||
sendInfo->requestId = generateRequestId();
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqAskEpCb;
|
||||
sendInfo->msgType = TDMT_MND_GET_SUB_EP;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
END:
|
||||
if (sync) tsem_wait(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
@ -812,7 +853,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
|||
return TMQ_RESP_ERR__FAIL;
|
||||
}
|
||||
|
||||
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
int64_t reqOffset;
|
||||
if (pVg->currentOffset >= 0) {
|
||||
reqOffset = pVg->currentOffset;
|
||||
|
@ -832,7 +873,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie
|
|||
strcpy(pReq->topic, pTopic->topicName);
|
||||
strcpy(pReq->cgroup, tmq->groupId);
|
||||
|
||||
pReq->blockingTime = blocking_time;
|
||||
pReq->blockingTime = blockingTime;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->epoch = tmq->epoch;
|
||||
pReq->currentOffset = reqOffset;
|
||||
|
@ -862,31 +903,34 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||
printf("call poll\n");
|
||||
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||
tmq_message_t* msg = NULL;
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
||||
continue;
|
||||
}
|
||||
/*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
|
||||
/*continue;*/
|
||||
/*}*/
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
||||
if (pReq == NULL) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// TODO: out of mem
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
|
||||
if (param == NULL) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// TODO: out of mem
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
param->tmq = tmq;
|
||||
param->pVg = pVg;
|
||||
param->epoch = tmq->epoch;
|
||||
param->sync = 1;
|
||||
param->msg = &msg;
|
||||
tsem_init(¶m->rspSem, 0, 0);
|
||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = pReq,
|
||||
|
@ -900,7 +944,70 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
sendInfo->fp = tmqPollCb;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
printf("send poll\n");
|
||||
/*printf("send poll\n");*/
|
||||
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
pVg->pollCnt++;
|
||||
tmq->pollCnt++;
|
||||
|
||||
tsem_wait(¶m->rspSem);
|
||||
tmq_message_t* nmsg = NULL;
|
||||
while (1) {
|
||||
taosReadQitem(tmq->mqueue, (void**)&nmsg);
|
||||
if (nmsg == NULL) continue;
|
||||
while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
|
||||
taosReadQitem(tmq->mqueue, (void**)&nmsg);
|
||||
}
|
||||
return nmsg;
|
||||
}
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||
/*printf("call poll\n");*/
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
||||
continue;
|
||||
}
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
||||
if (pReq == NULL) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// TODO: out of mem
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
|
||||
if (param == NULL) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// TODO: out of mem
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
param->tmq = tmq;
|
||||
param->pVg = pVg;
|
||||
param->epoch = tmq->epoch;
|
||||
param->sync = 0;
|
||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||
pRequest->body.requestMsg = (SDataBuf){
|
||||
.pData = pReq,
|
||||
.len = sizeof(SMqConsumeReq),
|
||||
.handle = NULL,
|
||||
};
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = param;
|
||||
sendInfo->fp = tmqPollCb;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
/*printf("send poll\n");*/
|
||||
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
pVg->pollCnt++;
|
||||
tmq->pollCnt++;
|
||||
|
@ -912,7 +1019,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
// return
|
||||
int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
|
||||
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
|
||||
printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);
|
||||
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
|
||||
if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) {
|
||||
tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp);
|
||||
tmqClearUnhandleMsg(tmq);
|
||||
|
@ -931,13 +1038,16 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
|||
tmq_message_t* rspMsg = NULL;
|
||||
taosGetQitem(tmq->qall, (void**)&rspMsg);
|
||||
if (rspMsg == NULL) {
|
||||
break;
|
||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
taosGetQitem(tmq->qall, (void**)&rspMsg);
|
||||
if (rspMsg == NULL) return NULL;
|
||||
}
|
||||
|
||||
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);
|
||||
atomic_sub_fetch_32(&tmq->readyRequest, 1);
|
||||
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
|
||||
if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) {
|
||||
printf("epoch match\n");
|
||||
/*printf("epoch match\n");*/
|
||||
SMqClientVg* pVg = rspMsg->extra;
|
||||
pVg->currentOffset = rspMsg->consumeRsp.rspOffset;
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
|
@ -947,7 +1057,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
|||
taosFreeQitem(rspMsg);
|
||||
}
|
||||
} else {
|
||||
printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);
|
||||
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
|
||||
bool reset = false;
|
||||
tmqHandleRes(tmq, rspMsg, &reset);
|
||||
taosFreeQitem(rspMsg);
|
||||
|
@ -957,36 +1067,59 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
|||
}
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
|
||||
tmq_message_t* rspMsg = NULL;
|
||||
int64_t startTime = taosGetTimestampMs();
|
||||
|
||||
int64_t status = atomic_load_64(&tmq->status);
|
||||
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
|
||||
|
||||
while (1) {
|
||||
rspMsg = tmqSyncPollImpl(tmq, blocking_time);
|
||||
if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
|
||||
return rspMsg;
|
||||
}
|
||||
|
||||
if (blocking_time != 0) {
|
||||
int64_t endTime = taosGetTimestampMs();
|
||||
if (endTime - startTime > blocking_time) {
|
||||
return NULL;
|
||||
}
|
||||
} else
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||
tmq_message_t* rspMsg = NULL;
|
||||
tmq_message_t* rspMsg;
|
||||
int64_t startTime = taosGetTimestampMs();
|
||||
|
||||
// TODO: put into another thread or delayed queue
|
||||
int64_t status = atomic_load_64(&tmq->status);
|
||||
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
|
||||
|
||||
taosGetQitem(tmq->qall, (void**)&rspMsg);
|
||||
if (rspMsg == NULL) {
|
||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
|
||||
if (rspMsg) {
|
||||
return rspMsg;
|
||||
}
|
||||
tmqHandleAllRsp(tmq, blocking_time, false);
|
||||
|
||||
tmqPollImpl(tmq, blocking_time);
|
||||
|
||||
while (1) {
|
||||
/*printf("cycle\n");*/
|
||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
rspMsg = tmqHandleAllRsp(tmq, blocking_time, true);
|
||||
if (atomic_load_32(&tmq->waitingRequest) == 0) {
|
||||
tmqPollImpl(tmq, blocking_time);
|
||||
}
|
||||
|
||||
tsem_wait(&tmq->rspSem);
|
||||
|
||||
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
|
||||
if (rspMsg) {
|
||||
return rspMsg;
|
||||
}
|
||||
if (blocking_time != 0) {
|
||||
int64_t endTime = taosGetTimestampMs();
|
||||
if (endTime - startTime > blocking_time) {
|
||||
printf("normal exit\n");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -1127,6 +1260,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
|
|||
if (tmq_message == NULL) return;
|
||||
SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
|
||||
tDeleteSMqConsumeRsp(pRsp);
|
||||
/*free(tmq_message);*/
|
||||
taosFreeQitem(tmq_message);
|
||||
}
|
||||
|
||||
|
@ -1138,6 +1272,7 @@ const char* tmq_err2str(tmq_resp_err_t err) {
|
|||
}
|
||||
return "fail";
|
||||
}
|
||||
|
||||
#if 0
|
||||
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
||||
tmq_t* pTmq = malloc(sizeof(tmq_t));
|
||||
|
|
|
@ -24,8 +24,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define META_SUPER_TABLE TD_SUPER_TABLE
|
||||
#define META_CHILD_TABLE TD_CHILD_TABLE
|
||||
#define META_SUPER_TABLE TD_SUPER_TABLE
|
||||
#define META_CHILD_TABLE TD_CHILD_TABLE
|
||||
#define META_NORMAL_TABLE TD_NORMAL_TABLE
|
||||
|
||||
// Types exported
|
||||
|
@ -50,14 +50,14 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
|||
int metaCommit(SMeta *pMeta);
|
||||
|
||||
// For Query
|
||||
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
char * metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
char *metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
|
||||
|
|
|
@ -213,6 +213,10 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
|
|||
//}
|
||||
|
||||
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
|
||||
if (pHandle->tbIdHash) {
|
||||
taosHashClear(pHandle->tbIdHash);
|
||||
}
|
||||
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -227,6 +231,23 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
||||
|
|
|
@ -76,4 +76,4 @@ struct SMeta {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_META_DEF_H_*/
|
||||
#endif /*_TD_META_DEF_H_*/
|
||||
|
|
|
@ -62,10 +62,10 @@ static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *
|
|||
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
|
||||
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
|
||||
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
|
||||
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
|
||||
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
|
||||
static void metaClearTbCfg(STbCfg *pTbCfg);
|
||||
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
|
||||
static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
|
||||
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
|
||||
static void metaDBWLock(SMetaDB *pDB);
|
||||
static void metaDBRLock(SMetaDB *pDB);
|
||||
static void metaDBULock(SMetaDB *pDB);
|
||||
|
@ -142,7 +142,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
|||
tb_uid_t uid;
|
||||
char buf[512];
|
||||
char buf1[512];
|
||||
void * pBuf;
|
||||
void *pBuf;
|
||||
DBT key1, value1;
|
||||
DBT key2, value2;
|
||||
SSchema *pSchema = NULL;
|
||||
|
@ -394,7 +394,7 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
|
|||
|
||||
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
|
||||
STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
|
||||
DBT * pDbt;
|
||||
DBT *pDbt;
|
||||
|
||||
if (pTbCfg->type == META_CHILD_TABLE) {
|
||||
// pDbt = calloc(2, sizeof(DBT));
|
||||
|
@ -479,7 +479,7 @@ static void metaClearTbCfg(STbCfg *pTbCfg) {
|
|||
|
||||
/* ------------------------ FOR QUERY ------------------------ */
|
||||
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
|
||||
STbCfg * pTbCfg = NULL;
|
||||
STbCfg *pTbCfg = NULL;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
DBT key = {0};
|
||||
DBT value = {0};
|
||||
|
@ -509,7 +509,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
|
|||
}
|
||||
|
||||
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
|
||||
STbCfg * pTbCfg = NULL;
|
||||
STbCfg *pTbCfg = NULL;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
DBT key = {0};
|
||||
DBT pkey = {0};
|
||||
|
@ -543,10 +543,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
|
|||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
|
||||
uint32_t nCols;
|
||||
SSchemaWrapper *pSW = NULL;
|
||||
SMetaDB * pDB = pMeta->pDB;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
int ret;
|
||||
void * pBuf;
|
||||
SSchema * pSchema;
|
||||
void *pBuf;
|
||||
SSchema *pSchema;
|
||||
SSchemaKey schemaKey = {uid, sver, 0};
|
||||
DBT key = {0};
|
||||
DBT value = {0};
|
||||
|
@ -578,7 +578,7 @@ struct SMTbCursor {
|
|||
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
|
||||
SMTbCursor *pTbCur = NULL;
|
||||
SMetaDB * pDB = pMeta->pDB;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
|
||||
pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur));
|
||||
if (pTbCur == NULL) {
|
||||
|
@ -609,7 +609,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
|
|||
DBT key = {0};
|
||||
DBT value = {0};
|
||||
STbCfg tbCfg;
|
||||
void * pBuf;
|
||||
void *pBuf;
|
||||
|
||||
for (;;) {
|
||||
if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
|
||||
|
@ -631,10 +631,10 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
|
|||
|
||||
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
||||
STSchemaBuilder sb;
|
||||
STSchema * pTSchema = NULL;
|
||||
SSchema * pSchema;
|
||||
STSchema *pTSchema = NULL;
|
||||
SSchema *pSchema;
|
||||
SSchemaWrapper *pSW;
|
||||
STbCfg * pTbCfg;
|
||||
STbCfg *pTbCfg;
|
||||
tb_uid_t quid;
|
||||
|
||||
pTbCfg = metaGetTbInfoByUid(pMeta, uid);
|
||||
|
@ -662,13 +662,13 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
|||
}
|
||||
|
||||
struct SMCtbCursor {
|
||||
DBC * pCur;
|
||||
DBC *pCur;
|
||||
tb_uid_t suid;
|
||||
};
|
||||
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
|
||||
SMCtbCursor *pCtbCur = NULL;
|
||||
SMetaDB * pDB = pMeta->pDB;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
int ret;
|
||||
|
||||
pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur));
|
||||
|
@ -700,7 +700,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
|
|||
DBT skey = {0};
|
||||
DBT pkey = {0};
|
||||
DBT pval = {0};
|
||||
void * pBuf;
|
||||
void *pBuf;
|
||||
STbCfg tbCfg;
|
||||
|
||||
// Set key
|
||||
|
|
|
@ -72,6 +72,8 @@ void tqClose(STQ* pTq) {
|
|||
}
|
||||
|
||||
int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
|
||||
// if waiting
|
||||
// memcpy and send msg to fetch thread
|
||||
// TODO: add reference
|
||||
// if handle waiting, launch query and response to consumer
|
||||
//
|
||||
|
@ -210,7 +212,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SMqConsumeReq* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t fetchOffset;
|
||||
/*int64_t blockingTime = pReq->blockingTime;*/
|
||||
int64_t blockingTime = pReq->blockingTime;
|
||||
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
fetchOffset = 0;
|
||||
|
|
|
@ -95,17 +95,17 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
|||
}
|
||||
|
||||
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo;
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
// traverse to the streamscan node to add this table id
|
||||
SOperatorInfo* pInfo = pTaskInfo->pRoot;
|
||||
while(pInfo->operatorType != OP_StreamScan) {
|
||||
while (pInfo->operatorType != OP_StreamScan) {
|
||||
pInfo = pInfo->pDownstream[0];
|
||||
}
|
||||
|
||||
SStreamBlockScanInfo* pScanInfo = pInfo->info;
|
||||
if (isAdd) {
|
||||
int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList);
|
||||
int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -114,4 +114,4 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
|
|||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +1,36 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tmsg.h"
|
||||
#include "query.h"
|
||||
#include "tglobal.h"
|
||||
#include "tsched.h"
|
||||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
#include "tsched.h"
|
||||
|
||||
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
|
||||
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
|
||||
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
|
||||
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
|
||||
|
||||
static struct SSchema _s = {
|
||||
.colId = TSDB_TBNAME_COLUMN_INDEX,
|
||||
.type = TSDB_DATA_TYPE_BINARY,
|
||||
.type = TSDB_DATA_TYPE_BINARY,
|
||||
.bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
|
||||
.name = "tbname",
|
||||
};
|
||||
|
||||
const SSchema* tGetTbnameColumnSchema() {
|
||||
return &_s;
|
||||
}
|
||||
const SSchema* tGetTbnameColumnSchema() { return &_s; }
|
||||
|
||||
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
|
||||
int32_t rowLen = 0;
|
||||
|
@ -87,7 +100,7 @@ int32_t initTaskQueue() {
|
|||
double factor = 4.0;
|
||||
|
||||
int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
|
||||
|
||||
|
||||
int32_t queueSize = tsMaxConnections * 2;
|
||||
pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc");
|
||||
if (NULL == pTaskQueue) {
|
||||
|
@ -96,19 +109,21 @@ int32_t initTaskQueue() {
|
|||
}
|
||||
|
||||
qDebug("task queue is initialized, numOfThreads: %d", numOfThreads);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t cleanupTaskQueue() {
|
||||
taosCleanUpScheduler(pTaskQueue);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void execHelper(struct SSchedMsg* pSchedMsg) {
|
||||
assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);
|
||||
|
||||
__async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle;
|
||||
int32_t code = execFn(pSchedMsg->thandle);
|
||||
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
|
||||
int32_t code = execFn(pSchedMsg->thandle);
|
||||
if (code != 0 && pSchedMsg->msg != NULL) {
|
||||
*(int32_t*) pSchedMsg->msg = code;
|
||||
*(int32_t*)pSchedMsg->msg = code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,34 +131,33 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
|
|||
assert(execFn != NULL);
|
||||
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = execHelper;
|
||||
schedMsg.fp = execHelper;
|
||||
schedMsg.ahandle = execFn;
|
||||
schedMsg.thandle = execParam;
|
||||
schedMsg.msg = code;
|
||||
schedMsg.msg = code;
|
||||
|
||||
taosScheduleTask(pTaskQueue, &schedMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
|
||||
char *pMsg = rpcMallocCont(pInfo->msgInfo.len);
|
||||
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
|
||||
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
|
||||
if (NULL == pMsg) {
|
||||
qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
|
||||
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = pInfo->msgType,
|
||||
.pCont = pMsg,
|
||||
.contLen = pInfo->msgInfo.len,
|
||||
.ahandle = (void*) pInfo,
|
||||
.handle = pInfo->msgInfo.handle,
|
||||
.code = 0
|
||||
};
|
||||
SRpcMsg rpcMsg = {.msgType = pInfo->msgType,
|
||||
.pCont = pMsg,
|
||||
.contLen = pInfo->msgInfo.len,
|
||||
.ahandle = (void*)pInfo,
|
||||
.handle = pInfo->msgInfo.handle,
|
||||
.code = 0};
|
||||
|
||||
assert(pInfo->fp != NULL);
|
||||
|
||||
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,8 +162,8 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
}
|
||||
// iterate files, until the searched result
|
||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize)
|
||||
|| (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) ||
|
||||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
|
||||
// delete according to file size or close time
|
||||
deleteCnt++;
|
||||
newTotSize -= iter->fileSize;
|
||||
|
@ -279,6 +279,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
} else {
|
||||
// reject skip log or rewrite log
|
||||
// must truncate explicitly first
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
return -1;
|
||||
}
|
||||
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
|
||||
|
@ -303,16 +304,18 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
|
||||
// ftruncate
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
|
||||
// ftruncate
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
code = walWriteIndex(pWal, index, offset);
|
||||
|
@ -329,7 +332,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
|
||||
pthread_mutex_unlock(&pWal->mutex);
|
||||
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void walFsync(SWal *pWal, bool forceFsync) {
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
|
@ -408,6 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
|
||||
|
||||
// tfs
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
|
||||
|
|
Loading…
Reference in New Issue