Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
This commit is contained in:
commit
48ea3963e7
|
@ -214,7 +214,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||||
|
|
||||||
void walRefFirstVer(SWal *, SWalRef *);
|
void walRefFirstVer(SWal *, SWalRef *);
|
||||||
void walRefLastVer(SWal *, SWalRef *);
|
void walRefLastVer(SWal *, SWalRef *);
|
||||||
SWalRef *walRefCommittedVer(SWal *);
|
void walRefCommitVer(SWal *, SWalRef *);
|
||||||
|
|
||||||
SWalRef *walOpenRef(SWal *);
|
SWalRef *walOpenRef(SWal *);
|
||||||
void walCloseRef(SWal *pWal, int64_t refId);
|
void walCloseRef(SWal *pWal, int64_t refId);
|
||||||
|
|
|
@ -31,7 +31,7 @@ typedef void *(*__array_item_dup_fn_t)(void *);
|
||||||
|
|
||||||
typedef void (*FDelete)(void *);
|
typedef void (*FDelete)(void *);
|
||||||
typedef int32_t (*FEncode)(void **buf, const void *dst);
|
typedef int32_t (*FEncode)(void **buf, const void *dst);
|
||||||
typedef void *(*FDecode)(const void *buf, void *dst);
|
typedef void *(*FDecode)(const void *buf, void *dst, int8_t sver);
|
||||||
|
|
||||||
#define TD_EQ 0x1
|
#define TD_EQ 0x1
|
||||||
#define TD_GT 0x2
|
#define TD_GT 0x2
|
||||||
|
|
|
@ -244,7 +244,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t
|
||||||
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param);
|
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param);
|
||||||
|
|
||||||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
||||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1756,9 +1756,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
request->code = code;
|
request->code = code;
|
||||||
info->cost.endTime = taosGetTimestampUs();
|
info->cost.endTime = taosGetTimestampUs();
|
||||||
info->cost.code = code;
|
info->cost.code = code;
|
||||||
if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING ||
|
if (NEED_CLIENT_HANDLE_ERROR(code) || code == TSDB_CODE_SDB_OBJ_CREATING ||
|
||||||
code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT ||
|
code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) {
|
||||||
code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
|
||||||
if (cnt++ >= 10) {
|
if (cnt++ >= 10) {
|
||||||
uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -566,14 +566,14 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
char* qmsg; // SubPlanToString
|
// char* qmsg; // SubPlanToString
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqVgEp;
|
} SMqVgEp;
|
||||||
|
|
||||||
SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
|
SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
|
||||||
void tDeleteSMqVgEp(SMqVgEp* pVgEp);
|
void tDeleteSMqVgEp(SMqVgEp* pVgEp);
|
||||||
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
|
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
|
||||||
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
|
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId; // -1 for unassigned
|
int64_t consumerId; // -1 for unassigned
|
||||||
|
@ -598,6 +598,7 @@ typedef struct {
|
||||||
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
||||||
SArray* offsetRows;
|
SArray* offsetRows;
|
||||||
char dbName[TSDB_DB_FNAME_LEN];
|
char dbName[TSDB_DB_FNAME_LEN];
|
||||||
|
char* qmsg; // SubPlanToString
|
||||||
} SMqSubscribeObj;
|
} SMqSubscribeObj;
|
||||||
|
|
||||||
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
|
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
|
||||||
|
@ -696,12 +697,12 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
||||||
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
|
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
|
||||||
void tFreeStreamObj(SStreamObj* pObj);
|
void tFreeStreamObj(SStreamObj* pObj);
|
||||||
|
|
||||||
typedef struct {
|
//typedef struct {
|
||||||
char streamName[TSDB_STREAM_FNAME_LEN];
|
// char streamName[TSDB_STREAM_FNAME_LEN];
|
||||||
int64_t uid;
|
// int64_t uid;
|
||||||
int64_t streamUid;
|
// int64_t streamUid;
|
||||||
SArray* childInfo; // SArray<SStreamChildEpInfo>
|
// SArray* childInfo; // SArray<SStreamChildEpInfo>
|
||||||
} SStreamCheckpointObj;
|
//} SStreamCheckpointObj;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -665,7 +665,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
SCMSubscribeReq subscribe = {0};
|
SCMSubscribeReq subscribe = {0};
|
||||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||||
|
|
||||||
uint64_t consumerId = subscribe.consumerId;
|
int64_t consumerId = subscribe.consumerId;
|
||||||
char *cgroup = subscribe.cgroup;
|
char *cgroup = subscribe.cgroup;
|
||||||
SMqConsumerObj *pExistedConsumer = NULL;
|
SMqConsumerObj *pExistedConsumer = NULL;
|
||||||
SMqConsumerObj *pConsumerNew = NULL;
|
SMqConsumerObj *pConsumerNew = NULL;
|
||||||
|
|
|
@ -187,14 +187,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||||
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||||
if (pVgEpNew == NULL) return NULL;
|
if (pVgEpNew == NULL) return NULL;
|
||||||
pVgEpNew->vgId = pVgEp->vgId;
|
pVgEpNew->vgId = pVgEp->vgId;
|
||||||
pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
|
// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
|
||||||
pVgEpNew->epSet = pVgEp->epSet;
|
pVgEpNew->epSet = pVgEp->epSet;
|
||||||
return pVgEpNew;
|
return pVgEpNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
||||||
if (pVgEp) {
|
if (pVgEp) {
|
||||||
taosMemoryFreeClear(pVgEp->qmsg);
|
// taosMemoryFreeClear(pVgEp->qmsg);
|
||||||
taosMemoryFree(pVgEp);
|
taosMemoryFree(pVgEp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -202,14 +202,18 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
||||||
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
||||||
tlen += taosEncodeString(buf, pVgEp->qmsg);
|
// tlen += taosEncodeString(buf, pVgEp->qmsg);
|
||||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
|
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
|
||||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||||
buf = taosDecodeString(buf, &pVgEp->qmsg);
|
if(sver == 1){
|
||||||
|
uint64_t size = 0;
|
||||||
|
buf = taosDecodeVariantU64(buf, &size);
|
||||||
|
buf = POINTER_SHIFT(buf, size);
|
||||||
|
}
|
||||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
@ -387,7 +391,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
|
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
|
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,13 +398,13 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
||||||
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||||
// if (pConsumerEpNew == NULL) return NULL;
|
// if (pConsumerEpNew == NULL) return NULL;
|
||||||
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
||||||
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
|
||||||
// return pConsumerEpNew;
|
// return pConsumerEpNew;
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//void tDeleteSMqConsumerEp(void *data) {
|
//void tDeleteSMqConsumerEp(void *data) {
|
||||||
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
||||||
// taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
// taosArrayDestroy(pConsumerEp->vgs);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||||
|
@ -437,7 +440,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||||
|
|
||||||
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
|
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||||
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
|
||||||
if (sver > 1){
|
if (sver > 1){
|
||||||
int32_t szVgs = 0;
|
int32_t szVgs = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &szVgs);
|
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||||
|
@ -521,6 +524,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
||||||
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
||||||
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
||||||
|
pSubNew->qmsg = taosStrdup(pSub->qmsg);
|
||||||
return pSubNew;
|
return pSubNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -535,6 +539,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(pSub->consumerHash);
|
taosHashCleanup(pSub->consumerHash);
|
||||||
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
|
taosMemoryFreeClear(pSub->qmsg);
|
||||||
taosArrayDestroy(pSub->offsetRows);
|
taosArrayDestroy(pSub->offsetRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,6 +584,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tlen += taosEncodeString(buf, pSub->qmsg);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -601,7 +607,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
||||||
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
|
||||||
buf = taosDecodeStringTo(buf, pSub->dbName);
|
buf = taosDecodeStringTo(buf, pSub->dbName);
|
||||||
|
|
||||||
if (sver > 1){
|
if (sver > 1){
|
||||||
|
@ -625,6 +631,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
buf = taosDecodeString(buf, &pSub->qmsg);
|
||||||
}
|
}
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -570,23 +570,19 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
|
|
||||||
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
|
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
if (pSubplan) {
|
if (pSubplan) {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
|
|
||||||
pSubplan->execNode.epSet = pVgEp->epSet;
|
if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
|
||||||
pSubplan->execNode.nodeId = pVgEp->vgId;
|
|
||||||
|
|
||||||
if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) {
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pVgEp->qmsg = taosStrdup("");
|
pSub->qmsg = taosStrdup("");
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
|
|
@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
|
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg) {
|
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
req.oldConsumerId = pRebVg->oldConsumerId;
|
req.oldConsumerId = pRebVg->oldConsumerId;
|
||||||
req.newConsumerId = pRebVg->newConsumerId;
|
req.newConsumerId = pRebVg->newConsumerId;
|
||||||
req.vgId = pRebVg->pVgEp->vgId;
|
req.vgId = pRebVg->pVgEp->vgId;
|
||||||
req.qmsg = pRebVg->pVgEp->qmsg;
|
if(pPlan){
|
||||||
|
pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
|
||||||
|
pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
|
||||||
|
int32_t msgLen;
|
||||||
|
if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) {
|
||||||
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
req.qmsg = taosStrdup("");
|
||||||
|
}
|
||||||
req.subType = pSub->subType;
|
req.subType = pSub->subType;
|
||||||
req.withMeta = pSub->withMeta;
|
req.withMeta = pSub->withMeta;
|
||||||
req.suid = pSub->stbUid;
|
req.suid = pSub->stbUid;
|
||||||
|
@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
|
tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
void *buf = taosMemoryMalloc(tlen);
|
void *buf = taosMemoryMalloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
|
if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
|
||||||
taosMemoryFreeClear(buf);
|
taosMemoryFreeClear(buf);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
*pBuf = buf;
|
*pBuf = buf;
|
||||||
*pLen = tlen;
|
*pLen = tlen;
|
||||||
|
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg) {
|
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
||||||
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||||
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
// return -1;
|
// return -1;
|
||||||
|
@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
|
if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -519,14 +533,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
||||||
|
struct SSubplan* pPlan = NULL;
|
||||||
|
if(strcmp(pOutput->pSub->qmsg, "") != 0){
|
||||||
|
int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -536,11 +561,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
int32_t vgNum = taosArrayGetSize(rebVgs);
|
int32_t vgNum = taosArrayGetSize(rebVgs);
|
||||||
for (int32_t i = 0; i < vgNum; i++) {
|
for (int32_t i = 0; i < vgNum; i++) {
|
||||||
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
|
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
|
||||||
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
|
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
|
|
||||||
// 2. redo log: subscribe and vg assignment
|
// 2. redo log: subscribe and vg assignment
|
||||||
// subscribe
|
// subscribe
|
||||||
|
|
|
@ -139,6 +139,7 @@ static STqMgmt tqMgmt = {0};
|
||||||
|
|
||||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
|
void tqDestroyTqHandle(void* data);
|
||||||
|
|
||||||
// tqRead
|
// tqRead
|
||||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
|
@ -161,6 +162,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq);
|
||||||
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
||||||
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
||||||
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
||||||
|
int32_t tqMetaGetHandle(STQ* pTq, const char* key);
|
||||||
|
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
|
||||||
|
|
||||||
STqOffsetStore* tqOffsetOpen(STQ* pTq);
|
STqOffsetStore* tqOffsetOpen(STQ* pTq);
|
||||||
void tqOffsetClose(STqOffsetStore*);
|
void tqOffsetClose(STqOffsetStore*);
|
||||||
|
|
|
@ -62,7 +62,7 @@ void tqCleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyTqHandle(void* data) {
|
void tqDestroyTqHandle(void* data) {
|
||||||
STqHandle* pData = (STqHandle*)data;
|
STqHandle* pData = (STqHandle*)data;
|
||||||
qDestroyTask(pData->execHandle.task);
|
qDestroyTask(pData->execHandle.task);
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
||||||
|
|
||||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
|
taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
|
||||||
|
|
||||||
taosInitRWLatch(&pTq->lock);
|
taosInitRWLatch(&pTq->lock);
|
||||||
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
||||||
|
@ -661,13 +661,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnode* pVnode = pTq->pVnode;
|
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||||
int32_t vgId = TD_VID(pVnode);
|
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
|
||||||
req.oldConsumerId, req.newConsumerId);
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = NULL;
|
||||||
|
while(1){
|
||||||
|
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
|
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
||||||
|
@ -678,86 +682,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
STqHandle handle = {0};
|
||||||
STqHandle tqHandle = {0};
|
ret = tqCreateHandle(pTq, &req, &handle);
|
||||||
pHandle = &tqHandle;
|
if(ret < 0){
|
||||||
|
tqDestroyTqHandle(&handle);
|
||||||
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
|
||||||
pHandle->consumerId = req.newConsumerId;
|
|
||||||
pHandle->epoch = -1;
|
|
||||||
|
|
||||||
pHandle->execHandle.subType = req.subType;
|
|
||||||
pHandle->fetchMeta = req.withMeta;
|
|
||||||
|
|
||||||
// TODO version should be assigned and refed during preprocess
|
|
||||||
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
|
|
||||||
if (pRef == NULL) {
|
|
||||||
ret = -1;
|
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||||
int64_t ver = pRef->refVer;
|
|
||||||
pHandle->pRef = pRef;
|
|
||||||
|
|
||||||
SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
|
|
||||||
initStorageAPI(&handle.api);
|
|
||||||
|
|
||||||
pHandle->snapshotVer = ver;
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg);
|
|
||||||
|
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
|
|
||||||
&pHandle->execHandle.numOfCols, req.newConsumerId);
|
|
||||||
void* scanner = NULL;
|
|
||||||
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
|
||||||
pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
|
||||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
|
||||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
|
||||||
|
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
|
||||||
buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
|
||||||
(SSnapContext**)(&handle.sContext));
|
|
||||||
|
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
|
||||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
|
||||||
pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg);
|
|
||||||
|
|
||||||
if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
|
|
||||||
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
|
|
||||||
tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
|
|
||||||
pVnode->config.vgId, req.subKey, pHandle->consumerId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
|
||||||
(SSnapContext**)(&handle.sContext));
|
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
|
||||||
|
|
||||||
SArray* tbUidList = NULL;
|
|
||||||
ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task);
|
|
||||||
if (ret != TDB_CODE_SUCCESS) {
|
|
||||||
tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey,
|
|
||||||
pHandle->consumerId);
|
|
||||||
taosArrayDestroy(tbUidList);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64,
|
|
||||||
pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
|
|
||||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
|
|
||||||
taosArrayDestroy(tbUidList);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
|
||||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
|
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
|
||||||
goto end;
|
|
||||||
} else {
|
} else {
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
|
|
|
@ -88,9 +88,9 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqMetaRestoreHandle(pTq) < 0) {
|
// if (tqMetaRestoreHandle(pTq) < 0) {
|
||||||
return -1;
|
// return -1;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -274,6 +274,120 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int buildHandle(STQ* pTq, STqHandle* handle){
|
||||||
|
SVnode* pVnode = pTq->pVnode;
|
||||||
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
|
handle->pRef = walOpenRef(pVnode->pWal);
|
||||||
|
if (handle->pRef == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
walSetRefVer(handle->pRef, handle->snapshotVer);
|
||||||
|
|
||||||
|
SReadHandle reader = {
|
||||||
|
.vnode = pVnode,
|
||||||
|
.initTableReader = true,
|
||||||
|
.initTqReader = true,
|
||||||
|
.version = handle->snapshotVer,
|
||||||
|
};
|
||||||
|
|
||||||
|
initStorageAPI(&reader.api);
|
||||||
|
|
||||||
|
if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
handle->execHandle.task =
|
||||||
|
qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId);
|
||||||
|
if (handle->execHandle.task == NULL) {
|
||||||
|
tqError("cannot create exec task for %s", handle->subKey);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void* scanner = NULL;
|
||||||
|
qExtractStreamScanner(handle->execHandle.task, &scanner);
|
||||||
|
if (scanner == NULL) {
|
||||||
|
tqError("cannot extract stream scanner for %s", handle->subKey);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
|
if (handle->execHandle.pTqReader == NULL) {
|
||||||
|
tqError("cannot extract exec reader for %s", handle->subKey);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||||
|
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||||
|
|
||||||
|
buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
|
||||||
|
(SSnapContext**)(&reader.sContext));
|
||||||
|
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
|
||||||
|
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||||
|
|
||||||
|
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
|
||||||
|
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
|
||||||
|
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType,
|
||||||
|
handle->fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||||
|
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
|
||||||
|
|
||||||
|
SArray* tbUidList = NULL;
|
||||||
|
int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task);
|
||||||
|
if(ret != TDB_CODE_SUCCESS) {
|
||||||
|
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
|
||||||
|
taosArrayDestroy(tbUidList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
|
||||||
|
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||||
|
tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
|
||||||
|
taosArrayDestroy(tbUidList);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
tDecodeSTqHandle(&decoder, handle);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
if(buildHandle(pTq, handle) < 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||||
|
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
|
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
handle->consumerId = req->newConsumerId;
|
||||||
|
handle->epoch = -1;
|
||||||
|
|
||||||
|
handle->execHandle.subType = req->subType;
|
||||||
|
handle->fetchMeta = req->withMeta;
|
||||||
|
if(req->subType == TOPIC_SUB_TYPE__COLUMN){
|
||||||
|
handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg);
|
||||||
|
}else if(req->subType == TOPIC_SUB_TYPE__DB){
|
||||||
|
handle->execHandle.execDb.pFilterOutTbUid =
|
||||||
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
}else if(req->subType == TOPIC_SUB_TYPE__TABLE){
|
||||||
|
handle->execHandle.execTb.suid = req->suid;
|
||||||
|
handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
handle->snapshotVer = walGetLastVer(pTq->pVnode->pWal);
|
||||||
|
|
||||||
|
if(buildHandle(pTq, handle) < 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||||
|
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
|
@ -281,97 +395,40 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
SDecoder decoder;
|
|
||||||
|
|
||||||
tdbTbcMoveToFirst(pCur);
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
STqHandle handle = {0};
|
STqHandle handle = {0};
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||||
tDecodeSTqHandle(&decoder, &handle);
|
if (code < 0){
|
||||||
tDecoderClear(&decoder);
|
tqDestroyTqHandle(&handle);
|
||||||
|
break;
|
||||||
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
|
||||||
if (handle.pRef == NULL) {
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
}
|
||||||
walSetRefVer(handle.pRef, handle.snapshotVer);
|
|
||||||
|
|
||||||
SReadHandle reader = {
|
|
||||||
.vnode = pTq->pVnode,
|
|
||||||
.initTableReader = true,
|
|
||||||
.initTqReader = true,
|
|
||||||
.version = handle.snapshotVer
|
|
||||||
};
|
|
||||||
|
|
||||||
initStorageAPI(&reader.api);
|
|
||||||
|
|
||||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
handle.execHandle.task =
|
|
||||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
|
|
||||||
if (handle.execHandle.task == NULL) {
|
|
||||||
tqError("cannot create exec task for %s", handle.subKey);
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
void* scanner = NULL;
|
|
||||||
qExtractStreamScanner(handle.execHandle.task, &scanner);
|
|
||||||
if (scanner == NULL) {
|
|
||||||
tqError("cannot extract stream scanner for %s", handle.subKey);
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
|
||||||
if (handle.execHandle.pTqReader == NULL) {
|
|
||||||
tqError("cannot extract exec reader for %s", handle.subKey);
|
|
||||||
code = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
|
||||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
|
||||||
|
|
||||||
buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
|
||||||
(SSnapContext**)(&reader.sContext));
|
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
|
||||||
|
|
||||||
if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
|
|
||||||
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
|
|
||||||
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
|
||||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
|
||||||
|
|
||||||
SArray* tbUidList = NULL;
|
|
||||||
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task);
|
|
||||||
if(ret != TDB_CODE_SUCCESS) {
|
|
||||||
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
|
|
||||||
taosArrayDestroy(tbUidList);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
|
||||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
|
||||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL);
|
|
||||||
taosArrayDestroy(tbUidList);
|
|
||||||
}
|
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
|
||||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
|
||||||
|
void* pVal = NULL;
|
||||||
|
int vLen = 0;
|
||||||
|
|
||||||
|
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
STqHandle handle = {0};
|
||||||
|
int code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||||
|
if (code < 0){
|
||||||
|
tqDestroyTqHandle(&handle);
|
||||||
|
}
|
||||||
|
tdbFree(pVal);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -81,26 +81,11 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) {
|
||||||
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver);
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalRef *walRefCommittedVer(SWal *pWal) {
|
void walRefCommitVer(SWal *pWal, SWalRef *pRef) {
|
||||||
SWalRef *pRef = walOpenRef(pWal);
|
|
||||||
if (pRef == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
int64_t ver = walGetCommittedVer(pWal);
|
int64_t ver = walGetCommittedVer(pWal);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
|
||||||
|
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
|
||||||
SWalFileInfo tmpInfo;
|
|
||||||
tmpInfo.firstVer = ver;
|
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
|
||||||
ASSERT(pRet != NULL);
|
|
||||||
// pRef->refFile = pRet->firstVer;
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return pRef;
|
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||||
}
|
}
|
||||||
|
|
|
@ -476,13 +476,13 @@ int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) {
|
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver) {
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
*pArray = taosArrayInit(sz, sizeof(void*));
|
*pArray = taosArrayInit(sz, sizeof(void*));
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
void* data = taosMemoryCalloc(1, dataSz);
|
void* data = taosMemoryCalloc(1, dataSz);
|
||||||
buf = decode(buf, data);
|
buf = decode(buf, data, sver);
|
||||||
taosArrayPush(*pArray, &data);
|
taosArrayPush(*pArray, &data);
|
||||||
}
|
}
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
|
|
Loading…
Reference in New Issue