Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/TD-14761
This commit is contained in:
commit
bdebf9e421
|
@ -87,7 +87,7 @@ taosBenchmark -f <json file>
|
||||||
<summary>subscribe.json</summary>
|
<summary>subscribe.json</summary>
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{{#include /taos-tools/example/tmq.json}}
|
{{#include /taos-tools/example/subscribe.json}}
|
||||||
```
|
```
|
||||||
|
|
||||||
</details>
|
</details>
|
||||||
|
|
|
@ -2994,6 +2994,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp);
|
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp);
|
||||||
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp);
|
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp);
|
||||||
|
void tDeleteSTaosxRsp(STaosxRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
|
|
|
@ -1773,7 +1773,7 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
|
||||||
}
|
}
|
||||||
return TMQ_RES_TABLE_META;
|
return TMQ_RES_TABLE_META;
|
||||||
} else if (TD_RES_TMQ_TAOSX(res)) {
|
} else if (TD_RES_TMQ_TAOSX(res)) {
|
||||||
return TMQ_RES_TAOSX;
|
return TMQ_RES_DATA;
|
||||||
} else {
|
} else {
|
||||||
return TMQ_RES_INVALID;
|
return TMQ_RES_INVALID;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5984,6 +5984,17 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
|
||||||
|
taosArrayDestroy(pRsp->blockDataLen);
|
||||||
|
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||||
|
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
||||||
|
|
||||||
|
taosArrayDestroy(pRsp->createTableLen);
|
||||||
|
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
||||||
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
||||||
|
|
|
@ -140,11 +140,12 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
|
|
||||||
// tqRead
|
// tqRead
|
||||||
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp);
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
|
||||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
||||||
|
|
||||||
// tqMeta
|
// tqMeta
|
||||||
|
|
|
@ -196,6 +196,66 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||||
|
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
|
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||||
|
|
||||||
|
if (pRsp->withSchema) {
|
||||||
|
ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||||
|
} else {
|
||||||
|
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
if (pRsp->blockNum > 0) {
|
||||||
|
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
|
} else {
|
||||||
|
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
||||||
|
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||||
|
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||||
|
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
tEncodeSTaosxRsp(&encoder, pRsp);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.info = pMsg->info,
|
||||||
|
.pCont = buf,
|
||||||
|
.contLen = tlen,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
char buf1[80] = {0};
|
||||||
|
char buf2[80] = {0};
|
||||||
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
|
tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
|
||||||
|
", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||||
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||||
pLeft->val.version <= pRight->val.version;
|
pLeft->val.version <= pRight->val.version;
|
||||||
|
@ -303,6 +363,22 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
||||||
|
pRsp->reqOffset = pReq->reqOffset;
|
||||||
|
|
||||||
|
pRsp->withTbName = 1;
|
||||||
|
pRsp->withSchema = 1;
|
||||||
|
pRsp->blockData = taosArrayInit(0, sizeof(void*));
|
||||||
|
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
|
||||||
|
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
||||||
|
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SMqPollReq* pReq = pMsg->pCont;
|
SMqPollReq* pReq = pMsg->pCont;
|
||||||
int64_t consumerId = pReq->consumerId;
|
int64_t consumerId = pReq->consumerId;
|
||||||
|
@ -341,9 +417,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||||
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
|
||||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
|
||||||
|
|
||||||
// 2.reset offset if needed
|
// 2.reset offset if needed
|
||||||
if (reqOffset.type > 0) {
|
if (reqOffset.type > 0) {
|
||||||
fetchOffsetNew = reqOffset;
|
fetchOffsetNew = reqOffset;
|
||||||
|
@ -367,6 +440,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
|
SMqDataRsp dataRsp = {0};
|
||||||
|
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
|
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||||
|
@ -380,16 +456,38 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
" in vg %d, subkey %s, reset none failed",
|
" in vg %d, subkey %s, reset none failed",
|
||||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
code = -1;
|
return -1;
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
SMqDataRsp dataRsp = {0};
|
||||||
|
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||||
|
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||||
|
|
||||||
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld",
|
||||||
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||||
|
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
||||||
|
|
||||||
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for taosx
|
||||||
|
ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);
|
||||||
|
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
|
|
||||||
|
STaosxRsp taosxRsp = {0};
|
||||||
|
tqInitTaosxRsp(&taosxRsp, pReq);
|
||||||
|
|
||||||
|
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||||
|
tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
|
||||||
|
|
||||||
if (metaRsp.metaRspLen > 0) {
|
if (metaRsp.metaRspLen > 0) {
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||||
|
@ -399,29 +497,30 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||||
metaRsp.rspOffset.version);
|
metaRsp.rspOffset.version);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
goto OVER;
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dataRsp.blockNum > 0) {
|
if (taosxRsp.blockNum > 0) {
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
goto OVER;
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
return code;
|
||||||
} else {
|
} else {
|
||||||
fetchOffsetNew = dataRsp.rspOffset;
|
fetchOffsetNew = taosxRsp.rspOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
|
tqDebug("taosx poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
||||||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
code = -1;
|
return -1;
|
||||||
goto OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
@ -436,13 +535,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||||
// TODO add push mgr
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
|
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
goto OVER;
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
if (pCkHead) taosMemoryFree(pCkHead);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalCont* pHead = &pCkHead->head;
|
SWalCont* pHead = &pCkHead->head;
|
||||||
|
@ -453,17 +552,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||||
|
|
||||||
if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) {
|
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
}
|
}
|
||||||
// TODO batch optimization:
|
// TODO batch optimization:
|
||||||
// TODO continue scan until meeting batch requirement
|
// TODO continue scan until meeting batch requirement
|
||||||
if (dataRsp.blockNum > 0 /* threshold */) {
|
if (taosxRsp.blockNum > 0 /* threshold */) {
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
goto OVER;
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
if (pCkHead) taosMemoryFree(pCkHead);
|
||||||
|
return code;
|
||||||
} else {
|
} else {
|
||||||
fetchVer++;
|
fetchVer++;
|
||||||
}
|
}
|
||||||
|
@ -472,30 +573,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
ASSERT(pHandle->fetchMeta);
|
ASSERT(pHandle->fetchMeta);
|
||||||
ASSERT(IS_META_MSG(pHead->msgType));
|
ASSERT(IS_META_MSG(pHead->msgType));
|
||||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||||
SMqMetaRsp metaRsp = {0};
|
|
||||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
metaRsp.resMsgType = pHead->msgType;
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
metaRsp.metaRspLen = pHead->bodyLen;
|
||||||
metaRsp.metaRsp = pHead->body;
|
metaRsp.metaRsp = pHead->body;
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
goto OVER;
|
taosMemoryFree(pCkHead);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
code = 0;
|
code = 0;
|
||||||
goto OVER;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// send empty to client
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
OVER:
|
|
||||||
if (pCkHead) taosMemoryFree(pCkHead);
|
if (pCkHead) taosMemoryFree(pCkHead);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
|
||||||
return code;
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
|
@ -602,10 +695,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
|
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
|
(SSnapContext**)(&handle.sContext));
|
||||||
|
|
||||||
pHandle->execHandle.task =
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||||
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||||
|
|
||||||
|
@ -89,18 +89,41 @@ int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
tqDebug("tmq task execute end, get %p", pDataBlock);
|
tqDebug("tmq task executed, get %p", pDataBlock);
|
||||||
|
|
||||||
|
if (pDataBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (pDataBlock) {
|
|
||||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
|
|
||||||
|
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
rowCnt += pDataBlock->info.rows;
|
||||||
|
if (rowCnt >= 4096) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ASSERT(pRsp->rspOffset.type != 0);
|
||||||
|
|
||||||
|
if (pRsp->withTbName) {
|
||||||
|
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
|
tqAddTbNameToRsp(pTq, uid, pRsp);
|
||||||
|
} else {
|
||||||
|
pRsp->withTbName = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(pRsp->withSchema == false);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
|
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
|
||||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
||||||
|
@ -134,7 +157,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
||||||
int64_t uid = 0;
|
int64_t uid = 0;
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
uid = pExec->pExecReader->msgIter.uid;
|
uid = pExec->pExecReader->msgIter.uid;
|
||||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -144,18 +167,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
||||||
}
|
}
|
||||||
if (pRsp->withSchema) {
|
if (pRsp->withSchema) {
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
||||||
} else {
|
} else {
|
||||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
|
||||||
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
|
|
||||||
} else {
|
|
||||||
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
|
|
||||||
}
|
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -165,7 +184,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
if (qStreamExtractPrepareUid(task) != 0) {
|
if (qStreamExtractPrepareUid(task) != 0) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -191,7 +209,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
||||||
|
|
||||||
*pMetaRsp = *tmp;
|
*pMetaRsp = *tmp;
|
||||||
tqDebug("tmqsnap task exec exited, get meta");
|
tqDebug("tmqsnap task exec exited, get meta");
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("task exec exited");
|
tqDebug("task exec exited");
|
||||||
break;
|
break;
|
||||||
|
@ -205,12 +222,11 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) {
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) {
|
||||||
STqExecHandle* pExec = &pHandle->execHandle;
|
STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
||||||
|
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pRsp->withSchema = 1;
|
|
||||||
STqReader* pReader = pExec->pExecReader;
|
STqReader* pReader = pExec->pExecReader;
|
||||||
tqReaderSetDataMsg(pReader, pReq, 0);
|
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||||
while (tqNextDataBlock(pReader)) {
|
while (tqNextDataBlock(pReader)) {
|
||||||
|
@ -220,18 +236,17 @@ int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
int64_t uid = pExec->pExecReader->msgIter.uid;
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
|
||||||
blockDataFreeRes(&block);
|
blockDataFreeRes(&block);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
blockDataFreeRes(&block);
|
blockDataFreeRes(&block);
|
||||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pRsp->withSchema = 1;
|
|
||||||
STqReader* pReader = pExec->pExecReader;
|
STqReader* pReader = pExec->pExecReader;
|
||||||
tqReaderSetDataMsg(pReader, pReq, 0);
|
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||||
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
|
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
|
||||||
|
@ -241,24 +256,25 @@ int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp
|
||||||
}
|
}
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
int64_t uid = pExec->pExecReader->msgIter.uid;
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp) < 0) {
|
||||||
blockDataFreeRes(&block);
|
blockDataFreeRes(&block);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
blockDataFreeRes(&block);
|
blockDataFreeRes(&block);
|
||||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
#if 0
|
#if 1
|
||||||
if (pHandle->fetchMeta && pRsp->blockNum) {
|
if (pHandle->fetchMeta && pRsp->blockNum) {
|
||||||
SSubmitMsgIter iter = {0};
|
SSubmitMsgIter iter = {0};
|
||||||
tInitSubmitMsgIter(pReq, &iter);
|
tInitSubmitMsgIter(pReq, &iter);
|
||||||
STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
|
STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
|
||||||
while (1) {
|
while (1) {
|
||||||
SSubmitBlk* pBlk = NULL;
|
SSubmitBlk* pBlk = NULL;
|
||||||
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1;
|
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) break;
|
||||||
|
if (pBlk == NULL) break;
|
||||||
if (pBlk->schemaLen > 0) {
|
if (pBlk->schemaLen > 0) {
|
||||||
if (pXrsp->createTableNum == 0) {
|
if (pXrsp->createTableNum == 0) {
|
||||||
pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
|
|
@ -143,6 +143,7 @@ typedef struct {
|
||||||
STqOffsetVal prepareStatus; // for tmq
|
STqOffsetVal prepareStatus; // for tmq
|
||||||
STqOffsetVal lastStatus; // for tmq
|
STqOffsetVal lastStatus; // for tmq
|
||||||
SMqMetaRsp metaRsp; // for tmq fetching meta
|
SMqMetaRsp metaRsp; // for tmq fetching meta
|
||||||
|
int8_t returned;
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
|
|
||||||
SSchemaWrapper *schema;
|
SSchemaWrapper *schema;
|
||||||
|
|
|
@ -745,6 +745,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||||
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
||||||
|
pTaskInfo->streamInfo.returned = 0;
|
||||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1285,8 +1285,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
qDebug("queue scan tsdb return %d rows", pResult->info.rows);
|
qDebug("queue scan tsdb return %d rows", pResult->info.rows);
|
||||||
|
pTaskInfo->streamInfo.returned = 1;
|
||||||
return pResult;
|
return pResult;
|
||||||
} else {
|
} else {
|
||||||
|
if (!pTaskInfo->streamInfo.returned) {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
tsdbReaderClose(pTSInfo->dataReader);
|
tsdbReaderClose(pTSInfo->dataReader);
|
||||||
pTSInfo->dataReader = NULL;
|
pTSInfo->dataReader = NULL;
|
||||||
|
@ -1296,6 +1298,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
|
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue