feat:add snapshot for tmq in stable and db

This commit is contained in:
wangmm0220 2022-08-04 15:01:59 +08:00
parent b0746d9a0b
commit c3cd858a63
14 changed files with 719 additions and 178 deletions

View File

@ -2589,6 +2589,11 @@ enum {
typedef struct { typedef struct {
int8_t type; int8_t type;
union { union {
// snapshot meta
struct {
int64_t muid;
int64_t mversion;
};
// snapshot data // snapshot data
struct { struct {
int64_t uid; int64_t uid;
@ -2913,33 +2918,14 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
int64_t reqOffset; STqOffsetVal rspOffset;
int64_t rspOffset;
STqOffsetVal reqOffsetNew;
STqOffsetVal rspOffsetNew;
int16_t resMsgType; int16_t resMsgType;
int32_t metaRspLen; int32_t metaRspLen;
void* metaRsp; void* metaRsp;
} SMqMetaRsp; } SMqMetaRsp;
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp);
int32_t tlen = 0; int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
return (void*)buf;
}
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;

View File

@ -41,6 +41,8 @@ typedef struct SReadHandle {
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
int32_t numOfVgroups; int32_t numOfVgroups;
void* sContext; // SSnapContext*
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
@ -172,7 +174,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);

View File

@ -1210,7 +1210,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
} else { } else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} }
@ -1758,8 +1761,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/ * rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);

View File

@ -5603,8 +5603,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version); snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts); snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(ss meta) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts);
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -5619,9 +5617,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) {
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0); return pLeft->muid == pRight->muid && pLeft->mversion == pRight->mversion;
// TODO
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else { } else {
ASSERT(0); ASSERT(0);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/ /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/
@ -5706,6 +5702,21 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1;
return 0; return 0;
} }
int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if(tEncodeI16(pEncoder, pRsp->resMsgType)) return -1;
if(tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1;
return 0;
}
int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI16(pDecoder, &pRsp->resMsgType) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t*)&pRsp->metaRspLen) < 0) return -1;
return 0;
}
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;

View File

@ -152,6 +152,23 @@ void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode); size_t tsdbCacheGetCapacity(SVnode *pVnode);
// tq // tq
typedef struct SMetaTableInfo{
int64_t suid;
int64_t uid;
SSchemaWrapper *schema;
}SMetaTableInfo;
typedef struct SSnapContext {
SMeta *pMeta;
int64_t snapVersion;
TBC *pCur;
int64_t suid;
int8_t subType;
SHashObj *idVersion;
SHashObj *suidInfo;
bool queryMetaOrData; // true-get meta, false-get data
}SSnapContext;
typedef struct STqReader { typedef struct STqReader {
int64_t ver; int64_t ver;
@ -200,6 +217,12 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx);
int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type);
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx);
int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver);
int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid);
// structs // structs
struct STsdbCfg { struct STsdbCfg {
int8_t precision; int8_t precision;

View File

@ -68,21 +68,21 @@ typedef struct {
typedef struct { typedef struct {
char* qmsg; char* qmsg;
qTaskInfo_t task;
} STqExecCol; } STqExecCol;
typedef struct { typedef struct {
int64_t suid; int64_t suid;
} STqExecTb; } STqExecTb;
typedef struct { typedef struct {
SHashObj* pFilterOutTbUid; SHashObj* pFilterOutTbUid;
} STqExecDb; } STqExecDb;
typedef struct { typedef struct {
int8_t subType; int8_t subType;
STqReader* pExecReader; STqReader* pExecReader;
qTaskInfo_t task;
union { union {
STqExecCol execCol; STqExecCol execCol;
STqExecTb execTb; STqExecTb execTb;
@ -101,6 +101,7 @@ typedef struct {
int64_t snapshotVer; int64_t snapshotVer;
SSnapContext* sContext;
// TODO remove // TODO remove
SWalReader* pWalReader; SWalReader* pWalReader;
@ -175,6 +176,12 @@ static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t u
pOffsetVal->ts = ts; pOffsetVal->ts = ts;
} }
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid, int64_t version) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
pOffsetVal->muid = uid;
pOffsetVal->mversion = version;
}
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
pOffsetVal->type = TMQ_OFFSET__LOG; pOffsetVal->type = TMQ_OFFSET__LOG;
pOffsetVal->version = ver; pOffsetVal->version = ver;

View File

@ -195,3 +195,318 @@ _err:
metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code; return code;
} }
typedef struct STableInfoForChildTable{
char *tableName;
SArray *tagName;
SSchemaWrapper *schemaRow;
}STableInfoForChildTable;
static void destroySTableInfoForChildTable(void* data) {
STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
taosMemoryFree(pData->tagName);
taosArrayDestroy(pData->tagName);
tDeleteSSchemaWrapper(pData->schemaRow);
}
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx){
ctx->pMeta = pMeta;
ctx->snapVersion = snapVersion;
ctx->suid = suid;
ctx->subType = subType;
ctx->queryMetaOrData = withMeta;
int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL);
if (ret < 0) {
return -1;
}
ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if(ctx->idVersion == NULL){
return -1;
}
ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if(ctx->suidInfo == NULL){
return -1;
}
taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
tdbTbcMoveToFirst(ctx->pCur);
while(1){
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) break;
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t));
}
tdbTbcMoveToFirst(ctx->pCur);
return TDB_CODE_SUCCESS;
}
int32_t destroySnapContext(SSnapContext* ctx){
tdbTbcClose(ctx->pCur);
taosHashCleanup(ctx->idVersion);
taosHashCleanup(ctx->suidInfo);
return 0;
}
static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_t *contLen){
int32_t ret = 0;
SVCreateTbBatchReq reqs = {};
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == reqs.pArray){
ret = -1;
goto end;
}
taosArrayPush(reqs.pArray, &req);
reqs.nReqs = 1;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
if(ret < 0){
ret = -1;
goto end;
}
*contLen += sizeof(SMsgHead);
*pBuf = taosMemoryMalloc(*contLen);
if (NULL == *pBuf) {
ret = -1;
goto end;
}
SEncoder coder = {0};
tEncoderInit(&coder, *pBuf + sizeof(SMsgHead), *contLen);
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
taosMemoryFreeClear(*pBuf);
tEncoderClear(&coder);
ret = -1;
goto end;
}
tEncoderClear(&coder);
end:
taosArrayDestroy(reqs.pArray);
return ret;
}
static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *contLen){
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
if (ret < 0) {
return -1;
}
*contLen += sizeof(SMsgHead);
*pBuf = taosMemoryMalloc(*contLen);
if (NULL == *pBuf) {
return -1;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, *pBuf + sizeof(SMsgHead), *contLen);
if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
taosMemoryFreeClear(*pBuf);
tEncoderClear(&encoder);
return -1;
}
tEncoderClear(&encoder);
return 0;
}
static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){
STableInfoForChildTable dataTmp = {0};
dataTmp.tableName = strdup(me->name);
dataTmp.tagName = taosArrayInit(me->stbEntry.schemaTag.nCols, TSDB_COL_NAME_LEN);
for(int i = 0; i < me->stbEntry.schemaTag.nCols; i++){
SSchema *schema = &me->stbEntry.schemaTag.pSchema[i];
taosArrayPush(dataTmp.tagName, schema->name);
}
dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
if(data){
destroySTableInfoForChildTable(data);
}
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}
int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver){
int c = 0;
ctx->queryMetaOrData = true; // change to get data
if(uid == 0 && ver == 0){
tdbTbcMoveToFirst(ctx->pCur);
return c;
}
STbDbKey key = {.version = ver, .uid = uid};
tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
return c;
}
int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){
int c = 0;
ctx->queryMetaOrData = false; // change to get data
if(uid == 0){
tdbTbcMoveToFirst(ctx->pCur);
return c;
}
int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
if(!ver){
return -1;
}
STbDbKey key = {.version = *ver, .uid = uid};
tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
return c;
}
int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type){
int32_t ret = 0;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
while(1){
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) {
ctx->queryMetaOrData = false; // change to get data
tdbTbcMoveToFirst(ctx->pCur);
return 0;
}
STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) {
tdbTbcMoveToFirst(ctx->pCur);
ctx->queryMetaOrData = false; // change to get data
return 0;
}
int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
ASSERT(ver);
if(*ver > tmp->version){
continue;
}
ASSERT(*ver == tmp->version);
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
metaDecodeEntry(&dc, &me);
if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE)
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
SVCreateStbReq req = {0};
req.name = me.name;
req.suid = me.uid;
req.schemaRow = me.stbEntry.schemaRow;
req.schemaTag = me.stbEntry.schemaTag;
ret = buildSuperTableInfo(&req, pBuf, contLen);
tDecoderClear(&dc);
*type = TDMT_VND_CREATE_STB;
break;
} else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE)
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
ASSERT(data);
SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = me.name;
req.uid = me.uid;
req.commentLen = -1;
req.ctb.suid = me.ctbEntry.suid;
req.ctb.tagNum = taosArrayGetSize(data->tagName);
req.ctb.name = data->tableName;
req.ctb.pTag = me.ctbEntry.pTags;
req.ctb.tagName = data->tagName;
ret = buildNormalChildTableInfo(&req, pBuf, contLen);
tDecoderClear(&dc);
*type = TDMT_VND_CREATE_TABLE;
break;
} else if(ctx->subType == TOPIC_SUB_TYPE__DB){
SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.name = me.name;
req.uid = me.uid;
req.commentLen = -1;
req.ntb.schemaRow = me.ntbEntry.schemaRow;
ret = buildNormalChildTableInfo(&req, pBuf, contLen);
tDecoderClear(&dc);
*type = TDMT_VND_CREATE_TABLE;
break;
} else{
tDecoderClear(&dc);
continue;
}
}
return ret;
}
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){
SMetaTableInfo result = {0};
int32_t ret = 0;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
while(1){
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) {
return result;
}
STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) {
return result;
}
int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
ASSERT(ver);
if(*ver > tmp->version){
continue;
}
ASSERT(*ver == tmp->version);
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
metaDecodeEntry(&dc, &me);
if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE){
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
result.uid = me.uid;
result.suid = me.ctbEntry.suid;
result.schema = data->schemaRow;
tDecoderClear(&dc);
break;
} else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
result.uid = me.uid;
result.suid = 0;
result.schema = &me.ntbEntry.schemaRow;
tDecoderClear(&dc);
break;
} else if(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
result.uid = me.uid;
result.suid = me.ctbEntry.suid;
result.schema = data->schemaRow;
tDecoderClear(&dc);
break;
} else{
tDecoderClear(&dc);
continue;
}
}
return result;
}

View File

@ -100,7 +100,13 @@ void tqClose(STQ* pTq) {
} }
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqMetaRsp(NULL, pRsp); int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
if (code < 0) {
return -1;
}
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
return -1; return -1;
@ -111,7 +117,11 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
((SMqRspHead*)buf)->consumerId = pReq->consumerId; ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqMetaRsp(&abuf, pRsp);
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
tEncodeSMqMetaRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg resp = { SRpcMsg resp = {
.info = pMsg->info, .info = pMsg->info,
@ -121,9 +131,8 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
}; };
tmsgSendRsp(&resp); tmsgSendRsp(&resp);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, reqOffset:%" PRId64 tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
", rspOffset:%" PRId64, TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);
return 0; return 0;
} }
@ -336,12 +345,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
TD_VID(pTq->pVnode), formatBuf); TD_VID(pTq->pVnode), formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pReq->useSnapshot){
if (!pHandle->fetchMeta) { if (pHandle->fetchMeta){
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToMeta(&fetchOffsetNew, 0, 0);
} else { } else {
// reset to meta tqOffsetResetToData(&fetchOffsetNew, 0, 0);
ASSERT(0);
} }
} else { } else {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
@ -385,7 +393,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
goto OVER; goto OVER;
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA)) {
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
}
if (dataRsp.blockNum == 0) {
// TODO add to async task pool
}
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META)) {
SSnapContext* sContext = pHandle->sContext;
if(setMetaForSnapShot(sContext, fetchOffsetNew.muid, fetchOffsetNew.mversion) != 0) {
qError("setMetaForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, fetchOffsetNew.muid, fetchOffsetNew.mversion);
code = -1;
goto OVER;
}
void* data = NULL;
int32_t dataLen = 0;
int16_t type = 0;
if(getMetafromSnapShot(sContext, &data, &dataLen, &type) < 0){
qError("getMetafromSnapShot error");
taosMemoryFreeClear(data);
code = -1;
goto OVER;
}
if(!sContext->queryMetaOrData){ // change to get data next poll request
fetchOffsetNew.type = TMQ_OFFSET__SNAPSHOT_DATA;
fetchOffsetNew.uid = 0;
fetchOffsetNew.ts = 0;
}
SMqMetaRsp metaRsp = {0};
metaRsp.rspOffset = fetchOffsetNew;
metaRsp.resMsgType = type;
metaRsp.metaRspLen = dataLen;
metaRsp.metaRsp = data;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
taosMemoryFreeClear(data);
code = -1;
goto OVER;
}
taosMemoryFreeClear(data);
code = 0;
goto OVER;
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__LOG)) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
@ -445,11 +502,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(IS_META_MSG(pHead->msgType)); ASSERT(IS_META_MSG(pHead->msgType));
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
/*metaRsp.reqOffset = pReq->reqOffset.version;*/ tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.rspOffset = fetchVer;
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body; metaRsp.metaRsp = pHead->body;
@ -461,22 +514,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
goto OVER; goto OVER;
} }
} }
taosMemoryFree(pCkHead);
#if 0
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0);
}
// 4. send rsp
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
#endif
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);
} }
OVER: OVER:
@ -553,23 +590,25 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int64_t ver = pRef->refVer; int64_t ver = pRef->refVer;
pHandle->pRef = pRef; pHandle->pRef = pRef;
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = ver,
};
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver; pHandle->snapshotVer = ver;
req.qmsg = NULL; req.qmsg = NULL;
SReadHandle handle = {
.meta = pTq->pVnode->pMeta, pHandle->execHandle.task =
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = ver,
};
pHandle->execHandle.execCol.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper); &pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task); ASSERT(pHandle->execHandle.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); qExtractStreamScanner(pHandle->execHandle.task, &scanner);
ASSERT(scanner); ASSERT(scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader); ASSERT(pHandle->execHandle.pExecReader);
@ -579,10 +618,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext);
pHandle->sContext = handle.sContext;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext);
pHandle->sContext = handle.sContext;
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);

View File

@ -62,16 +62,16 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->execCol.task; qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return"); tqDebug("prepare scan failed, return");
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
} else { } else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return"); tqDebug("prepare scan failed, return");
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
@ -108,12 +108,23 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
rowCnt += pDataBlock->info.rows; rowCnt += pDataBlock->info.rows;
if (rowCnt <= 4096) continue; if (rowCnt <= 4096) continue;
} }
} else {
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
SMetaTableInfo mtInfo = getUidfromSnapShot(pHandle->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
}else{
pOffset->uid = mtInfo.uid;
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
continue;
}
}
} }
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset); qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
continue; continue;
} }

View File

@ -79,11 +79,11 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
.version = handle.snapshotVer, .version = handle.snapshotVer,
}; };
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( handle.execHandle.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task); ASSERT(handle.execHandle.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); qExtractStreamScanner(handle.execHandle.task, &scanner);
ASSERT(scanner); ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader); ASSERT(handle.execHandle.pExecReader);

View File

@ -398,7 +398,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd); int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
ASSERT(code == 0); ASSERT(code == 0);
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
if (!isAdd) { if (!isAdd) {

View File

@ -490,6 +490,21 @@ typedef struct SStreamScanInfo {
SNode* pTagIndexCond; SNode* pTagIndexCond;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct SStreamRawScanInfo{
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SReadHandle * readHandle;
SSDataBlock pRes; // result SSDataBlock
uint64_t groupId;
STsdbReader* dataReader;
SSnapContext* sContext;
}SStreamRawScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp; SRetrieveMetaTableRsp* pRsp;
SRetrieveTableReq req; SRetrieveTableReq req;
@ -929,6 +944,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,

View File

@ -117,7 +117,23 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) { if (msg == NULL) {
// TODO create raw scan // TODO create raw scan
return NULL;
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (NULL == pTaskInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
if(NULL == pTaskInfo->pRoot){
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo);
return NULL;
}
return pTaskInfo;
} }
struct SSubplan* pPlan = NULL; struct SSubplan* pPlan = NULL;
@ -582,102 +598,154 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
return 0; return 0;
} }
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo mtInfo) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = mtInfo.schema->nCols;
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = mtInfo.suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = sContext->snapVersion;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
pCond->colList[i].type = mtInfo.schema->pSchema[i].type;
pCond->colList[i].bytes = mtInfo.schema->pSchema[i].bytes;
pCond->colList[i].colId = mtInfo.schema->pSchema[i].colId;
}
return TSDB_CODE_SUCCESS;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.prepareStatus = *pOffset;
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
while (1) { return 0;
uint8_t type = pOperator->operatorType; }
pOperator->status = OP_OPENED; if (subType == TOPIC_SUB_TYPE__COLUMN) {
// TODO add more check uint8_t type = pOperator->operatorType;
if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { pOperator->status = OP_OPENED;
ASSERT(pOperator->numOfDownstream == 1); // TODO add more check
pOperator = pOperator->pDownstream[0]; if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
} ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->dataReader); tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL; pTSInfo->dataReader = NULL;
#if 0 #if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) { pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version, qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version); pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0); ASSERT(0);
} }
#endif #endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
} else {
return -1; return -1;
} }
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); }
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
if (uid == 0) { /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uid = pTableInfo->uid; int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
ts = INT64_MIN;
} else {
return -1;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
#ifndef NDEBUG #ifndef NDEBUG
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable, qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
pInfo->pTableScanOp->resultInfo.totalRows); pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif #endif
bool found = false; bool found = false;
for (int32_t i = 0; i < tableSz; i++) { for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) { if (pTableInfo->uid == uid) {
found = true; found = true;
pTableScanInfo->currentTable = i; pTableScanInfo->currentTable = i;
break; break;
}
} }
// TODO after dropping table, table may be not found
ASSERT(found);
if (pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
ASSERT(0);
}
}
tsdbSetTableId(pTableScanInfo->dataReader, uid);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pTableScanInfo->cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
/*}*/
} else {
ASSERT(0);
} }
return 0;
// TODO after dropping table, table may be not found
ASSERT(found);
if (pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
ASSERT(0);
}
}
tsdbSetTableId(pTableScanInfo->dataReader, uid);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pTableScanInfo->cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
/*}*/
} else {
ASSERT(0);
} }
}else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if(setDataForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid);
return -1;
}
SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
if(pOffset->uid == 0) pOffset->uid = mtInfo.uid;
if(pOffset->ts == 0) pOffset->ts = INT64_MIN;
if (pOffset->uid == 0) {
return -1;
}
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts);
} }
return 0; return 0;
} }

View File

@ -1234,9 +1234,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0); ASSERT(0);
pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.lastStatus = ret.offset;
pTaskInfo->streamInfo.metaBlk = ret.meta; // pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL; // return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) { } else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
@ -1257,10 +1257,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
qDebug("stream scan tsdb return null"); qDebug("stream scan tsdb return null");
return NULL; return NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
// TODO scan meta
ASSERT(0);
return NULL;
} }
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
@ -1444,11 +1440,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
} }
static SSDataBlock* doRawScan(SOperatorInfo* pInfo) {
//
return NULL;
}
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
@ -1461,17 +1452,76 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
return tableIdList; return tableIdList;
} }
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info;
qDebug("stream scan called");
ASSERT(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA);
SSDataBlock* pBlock = &pInfo->pRes;
while (tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
SDataBlockInfo binfo = pBlock->info;
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo);
pBlock->info = binfo;
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
if (pCols == NULL) {
return NULL;
}
// size_t numOfSrcCols = taosArrayGetSize(pCols);
// for (int i = 0; i < taosArrayGetSize(pCols); i++) {
// SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId);
// colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
// }
pBlock->pDataBlock = pCols;
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
return pBlock;
}
qDebug("stream scan tsdb return null");
return NULL;
}
// for subscribing db or stb (not including column), // for subscribing db or stb (not including column),
// if this scan is used, meta data can be return // if this scan is used, meta data can be return
// and schemas are decided when scanning // and schemas are decided when scanning
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) {
// create operator // create operator
// create tb reader // create tb reader
// create meta reader // create meta reader
// create tq reader // create tq reader
return NULL; SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->readHandle = pHandle;
pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator";
// pOperator->blocking = false;
// pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, NULL,
NULL, NULL, NULL);
return pOperator;
} }
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {