Merge pull request #14278 from taosdata/feature/TD-11274-3.0
refactor: rsma restore
This commit is contained in:
commit
cf6f7516d9
|
@ -78,7 +78,6 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
||||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
||||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
||||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
||||||
void debugCheckTags(STag *pTag); // TODO: remove
|
|
||||||
|
|
||||||
// STRUCT =================
|
// STRUCT =================
|
||||||
struct STColumn {
|
struct STColumn {
|
||||||
|
|
|
@ -1886,7 +1886,7 @@ typedef struct SVCreateStbReq {
|
||||||
int8_t rollup;
|
int8_t rollup;
|
||||||
SSchemaWrapper schemaRow;
|
SSchemaWrapper schemaRow;
|
||||||
SSchemaWrapper schemaTag;
|
SSchemaWrapper schemaTag;
|
||||||
SRSmaParam pRSmaParam;
|
SRSmaParam rsmaParam;
|
||||||
} SVCreateStbReq;
|
} SVCreateStbReq;
|
||||||
|
|
||||||
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
|
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
|
||||||
|
|
|
@ -862,21 +862,6 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
|
||||||
printf("\n");
|
printf("\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
void debugCheckTags(STag *pTag) {
|
|
||||||
switch (pTag->flags) {
|
|
||||||
case 0x0:
|
|
||||||
case 0x20:
|
|
||||||
case 0x40:
|
|
||||||
case 0x60:
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pTag->nTag <= 128 && pTag->nTag >= 0);
|
|
||||||
ASSERT(pTag->ver <= 512 && pTag->ver >= 0); // temp condition for pTag->ver
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
|
||||||
|
@ -999,7 +984,6 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
|
||||||
debugPrintSTag(*ppTag, __func__, __LINE__);
|
debugPrintSTag(*ppTag, __func__, __LINE__);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
debugCheckTags(*ppTag); // TODO: remove this line after debug
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
|
@ -4763,7 +4763,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
||||||
if (pReq->rollup) {
|
if (pReq->rollup) {
|
||||||
if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
|
if (tEncodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
|
@ -4779,7 +4779,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
||||||
if (pReq->rollup) {
|
if (pReq->rollup) {
|
||||||
if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
|
if (tDecodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
|
|
|
@ -427,17 +427,17 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.schemaTag.pSchema = pStb->pTags;
|
req.schemaTag.pSchema = pStb->pTags;
|
||||||
|
|
||||||
if (req.rollup) {
|
if (req.rollup) {
|
||||||
req.pRSmaParam.maxdelay[0] = pStb->maxdelay[0];
|
req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
|
||||||
req.pRSmaParam.maxdelay[1] = pStb->maxdelay[1];
|
req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
|
||||||
if (pStb->ast1Len > 0) {
|
if (pStb->ast1Len > 0) {
|
||||||
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[0], &req.pRSmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
|
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
|
||||||
STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[0]) < 0) {
|
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pStb->ast2Len > 0) {
|
if (pStb->ast2Len > 0) {
|
||||||
if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[1], &req.pRSmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
|
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
|
||||||
STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[1]) < 0) {
|
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1]) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -470,12 +470,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
*pContLen = contLen;
|
*pContLen = contLen;
|
||||||
taosMemoryFreeClear(req.pRSmaParam.qmsg[0]);
|
taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
|
||||||
taosMemoryFreeClear(req.pRSmaParam.qmsg[1]);
|
taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
|
||||||
return pHead;
|
return pHead;
|
||||||
_err:
|
_err:
|
||||||
taosMemoryFreeClear(req.pRSmaParam.qmsg[0]);
|
taosMemoryFreeClear(req.rsmaParam.qmsg[0]);
|
||||||
taosMemoryFreeClear(req.pRSmaParam.qmsg[1]);
|
taosMemoryFreeClear(req.rsmaParam.qmsg[1]);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -199,15 +199,20 @@ typedef struct {
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
||||||
|
#define TABLE_ROLLUP_ON ((int8_t)0x1)
|
||||||
|
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
|
||||||
|
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
|
||||||
struct SMetaEntry {
|
struct SMetaEntry {
|
||||||
int64_t version;
|
int64_t version;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
int8_t flags; // TODO: need refactor?
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
char *name;
|
char *name;
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
SSchemaWrapper schemaRow;
|
SSchemaWrapper schemaRow;
|
||||||
SSchemaWrapper schemaTag;
|
SSchemaWrapper schemaTag;
|
||||||
|
SRSmaParam rsmaParam;
|
||||||
} stbEntry;
|
} stbEntry;
|
||||||
struct {
|
struct {
|
||||||
int64_t ctime;
|
int64_t ctime;
|
||||||
|
|
|
@ -24,22 +24,25 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
||||||
|
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
|
if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor?
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
||||||
|
if (TABLE_IS_ROLLUP(pME->flags)) {
|
||||||
|
if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
|
||||||
|
}
|
||||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
|
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
|
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ctbEntry.commentLen > 0){
|
if (pME->ctbEntry.commentLen > 0){
|
||||||
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
|
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
|
||||||
debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug
|
|
||||||
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
|
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
|
||||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
|
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
|
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ntbEntry.commentLen > 0){
|
if (pME->ntbEntry.commentLen > 0){
|
||||||
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
|
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -64,23 +67,26 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
||||||
|
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
|
if (tDecodeI8(pCoder, &pME->flags) < 0) return -1; // TODO: need refactor?
|
||||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
||||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
||||||
|
if (TABLE_IS_ROLLUP(pME->flags)) {
|
||||||
|
if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
|
||||||
|
}
|
||||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
|
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ctbEntry.commentLen > 0){
|
if (pME->ctbEntry.commentLen > 0){
|
||||||
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0)
|
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
|
||||||
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
|
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
|
||||||
debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug
|
|
||||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
|
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
|
||||||
if (pME->ntbEntry.commentLen > 0){
|
if (pME->ntbEntry.commentLen > 0){
|
||||||
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -342,6 +342,7 @@ SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
|
||||||
|
|
||||||
pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
|
pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
|
||||||
if (pStbCur == NULL) {
|
if (pStbCur == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -351,6 +352,7 @@ SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
|
||||||
|
|
||||||
ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
|
ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
taosMemoryFree(pStbCur);
|
taosMemoryFree(pStbCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -139,6 +139,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
me.name = pReq->name;
|
me.name = pReq->name;
|
||||||
me.stbEntry.schemaRow = pReq->schemaRow;
|
me.stbEntry.schemaRow = pReq->schemaRow;
|
||||||
me.stbEntry.schemaTag = pReq->schemaTag;
|
me.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
|
if (pReq->rollup) {
|
||||||
|
TABLE_SET_ROLLUP(me.flags);
|
||||||
|
me.stbEntry.rsmaParam = pReq->rsmaParam;
|
||||||
|
}
|
||||||
|
|
||||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||||
|
|
||||||
|
|
|
@ -156,6 +156,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
||||||
|
|
||||||
static void tdDestroyTSmaStat(STSmaStat *pStat) {
|
static void tdDestroyTSmaStat(STSmaStat *pStat) {
|
||||||
if (pStat) {
|
if (pStat) {
|
||||||
|
smaDebug("destroy tsma stat");
|
||||||
tDestroyTSma(pStat->pTSma);
|
tDestroyTSma(pStat->pTSma);
|
||||||
taosMemoryFreeClear(pStat->pTSma);
|
taosMemoryFreeClear(pStat->pTSma);
|
||||||
taosMemoryFreeClear(pStat->pTSchema);
|
taosMemoryFreeClear(pStat->pTSchema);
|
||||||
|
@ -170,15 +171,12 @@ static void *tdFreeTSmaStat(STSmaStat *pStat) {
|
||||||
|
|
||||||
static void tdDestroyRSmaStat(SRSmaStat *pStat) {
|
static void tdDestroyRSmaStat(SRSmaStat *pStat) {
|
||||||
if (pStat) {
|
if (pStat) {
|
||||||
smaDebug("vgId:%d, %s:%d free rsma stat", SMA_VID(pStat->pSma), __func__, __LINE__);
|
smaDebug("vgId:%d destroy rsma stat", SMA_VID(pStat->pSma));
|
||||||
// step 1: set persistence task cancelled
|
// step 1: set persistence task cancelled
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
|
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
|
||||||
|
|
||||||
// step 2: clean timer
|
// step 2: stop the persistence timer
|
||||||
taosTmrStopA(&RSMA_TMR_ID(pStat));
|
taosTmrStopA(&RSMA_TMR_ID(pStat));
|
||||||
if (RSMA_TMR_HANDLE(pStat)) {
|
|
||||||
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
|
|
||||||
}
|
|
||||||
|
|
||||||
// step 3: wait the persistence thread to finish
|
// step 3: wait the persistence thread to finish
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
|
@ -194,7 +192,6 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
|
||||||
sched_yield();
|
sched_yield();
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
}
|
}
|
||||||
taosMsleep(1000); // TODO: remove this line when release
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,7 +216,11 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) {
|
||||||
sched_yield();
|
sched_yield();
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
}
|
}
|
||||||
taosMsleep(1000); // TODO: remove this line when release
|
}
|
||||||
|
|
||||||
|
// step 6: cleanup the timer handle
|
||||||
|
if (RSMA_TMR_HANDLE(pStat)) {
|
||||||
|
taosTmrCleanUp(RSMA_TMR_HANDLE(pStat));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,16 +246,12 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||||
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||||
if (pSmaStat) {
|
if (pSmaStat) {
|
||||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||||
smaDebug("%s:%d destroy tsma stat", __func__, __LINE__);
|
|
||||||
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
|
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
|
||||||
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||||
smaDebug("%s:%d destroy rsma stat", __func__, __LINE__);
|
|
||||||
tdDestroyRSmaStat(SMA_RSMA_STAT(pSmaStat));
|
tdDestroyRSmaStat(SMA_RSMA_STAT(pSmaStat));
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
smaDebug("%s:%d no need to destroy rsma stat", __func__, __LINE__);
|
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
static int32_t smaEvalDays(SRetention *r, int8_t precision);
|
static int32_t smaEvalDays(SRetention *r, int8_t precision);
|
||||||
static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
|
static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
|
||||||
|
static int32_t rsmaRestore(SSma *pSma);
|
||||||
|
|
||||||
#define SMA_SET_KEEP_CFG(l) \
|
#define SMA_SET_KEEP_CFG(l) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -100,6 +101,9 @@ int32_t smaOpen(SVnode *pVnode) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pVnode->pSma = pSma;
|
||||||
|
|
||||||
pSma->pVnode = pVnode;
|
pSma->pVnode = pVnode;
|
||||||
taosThreadMutexInit(&pSma->mutex, NULL);
|
taosThreadMutexInit(&pSma->mutex, NULL);
|
||||||
pSma->locked = false;
|
pSma->locked = false;
|
||||||
|
@ -117,9 +121,13 @@ int32_t smaOpen(SVnode *pVnode) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// restore the rsma
|
||||||
|
if (rsmaRestore(pSma) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->pSma = pSma;
|
|
||||||
return 0;
|
return 0;
|
||||||
_err:
|
_err:
|
||||||
taosMemoryFreeClear(pSma);
|
taosMemoryFreeClear(pSma);
|
||||||
|
@ -127,7 +135,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smaCloseEnv(SSma *pSma) {
|
int32_t smaCloseEnv(SSma *pSma) {
|
||||||
if(pSma) {
|
if (pSma) {
|
||||||
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
|
SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
|
||||||
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
|
SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
|
||||||
}
|
}
|
||||||
|
@ -157,9 +165,39 @@ int32_t smaClose(SSma *pSma) {
|
||||||
* @param pSma
|
* @param pSma
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t smaRestore(SSma *pSma) {
|
static int32_t rsmaRestore(SSma *pSma) {
|
||||||
if (!pSma) return 0;
|
ASSERT(VND_IS_RSMA(pSma->pVnode));
|
||||||
|
|
||||||
// iterate all stables to restore the rsma env
|
// iterate all stables to restore the rsma env
|
||||||
|
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
|
||||||
|
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
|
||||||
|
smaError("failed to restore rsma since get stb id list error: %s", terrstr());
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
|
||||||
|
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
||||||
|
smaDebug("suid [%d] is %" PRIi64, i, suid);
|
||||||
|
if (metaGetTableEntryByUid(&mr, suid) < 0) {
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
taosArrayDestroy(suidList);
|
||||||
|
smaError("failed to get table meta for %" PRIi64 " since %s", suid, terrstr());
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
|
||||||
|
if (TABLE_IS_ROLLUP(mr.me.flags)) {
|
||||||
|
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode),
|
||||||
|
suid, i, param->maxdelay[i], i, param->watermark[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
taosArrayDestroy(suidList);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
|
@ -49,10 +49,10 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId,
|
||||||
// Note: free/kill may in RC
|
// Note: free/kill may in RC
|
||||||
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
||||||
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
||||||
smaDebug("vgId:%d, %s:%d free qTaskInfo_t %p of level %d", vgId, __func__, __LINE__, otaskHandle, level);
|
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
|
||||||
qDestroyTask(otaskHandle);
|
qDestroyTask(otaskHandle);
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, %s:%d not free qTaskInfo_t %p of level %d", vgId, __func__, __LINE__, otaskHandle, level);
|
smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) {
|
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) {
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
SRSmaInfo *pRSmaInfo = NULL;
|
SRSmaInfo *pRSmaInfo = NULL;
|
||||||
|
@ -246,7 +246,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
|
||||||
|
|
||||||
SMeta *pMeta = pVnode->pMeta;
|
SMeta *pMeta = pVnode->pMeta;
|
||||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
SMsgCb *pMsgCb = &pVnode->msgCb;
|
||||||
SRSmaParam *param = &pReq->pRSmaParam;
|
SRSmaParam *param = &pReq->rsmaParam;
|
||||||
|
|
||||||
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
|
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
|
||||||
smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
|
||||||
|
@ -502,8 +502,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
|
} else if (terrno == 0) {
|
||||||
|
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, no rsma %" PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
|
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
|
||||||
tdDestroySDataBlockArray(pResult);
|
tdDestroySDataBlockArray(pResult);
|
||||||
|
@ -526,16 +528,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
|
|
||||||
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
||||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
|
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
|
||||||
smaDebug("vgId:%d, %s:%d level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma),
|
smaDebug("vgId:%d, level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma),
|
||||||
__func__, __LINE__, pItem->level, pItem->pRsmaInfo->suid);
|
pItem->level, pItem->pRsmaInfo->suid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t fetchTriggerStat =
|
int8_t fetchTriggerStat =
|
||||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||||
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
|
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
|
||||||
smaDebug("vgId:%d, %s:%d level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), __func__,
|
smaDebug("vgId:%d, level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
||||||
__LINE__, pItem->level, pItem->pRsmaInfo->suid);
|
pItem->pRsmaInfo->suid);
|
||||||
|
|
||||||
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
|
||||||
|
@ -546,13 +548,13 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, %s:%d level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), __func__,
|
smaDebug("vgId:%d, level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
||||||
__LINE__, pItem->level, pItem->pRsmaInfo->suid);
|
pItem->pRsmaInfo->suid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
|
||||||
tb_uid_t suid, int8_t level) {
|
int8_t level) {
|
||||||
if (!pItem || !pItem->taskInfo) {
|
if (!pItem || !pItem->taskInfo) {
|
||||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -568,7 +570,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
|
||||||
|
|
||||||
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||||
smaDebug("vgId:%d, %s:%d process rsma insert", SMA_VID(pSma), __func__, __LINE__);
|
smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma));
|
||||||
|
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
||||||
|
@ -661,18 +663,6 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
|
|
||||||
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
} else {
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
|
|
||||||
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
|
||||||
smaDebug("suid [%d] is %" PRIi64, i, suid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||||
if (!infoHash) {
|
if (!infoHash) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -833,7 +823,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_CANCELLED,
|
TASK_TRIGGER_STAT_CANCELLED,
|
||||||
TASK_TRIGGER_STAT_FINISHED)) {
|
TASK_TRIGGER_STAT_FINISHED)) {
|
||||||
smaDebug("%s:%d rsma persistence start since active", __func__, __LINE__);
|
smaDebug("rsma persistence start since active");
|
||||||
tdRSmaPersistTask(pRSmaStat);
|
tdRSmaPersistTask(pRSmaStat);
|
||||||
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
|
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -842,16 +832,17 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
} break;
|
} break;
|
||||||
case TASK_TRIGGER_STAT_CANCELLED: {
|
case TASK_TRIGGER_STAT_CANCELLED: {
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
||||||
smaDebug("%s:%d rsma persistence not start since cancelled and finished", __func__, __LINE__);
|
smaDebug("rsma persistence not start since cancelled and finished");
|
||||||
} break;
|
} break;
|
||||||
case TASK_TRIGGER_STAT_INACTIVE: {
|
case TASK_TRIGGER_STAT_INACTIVE: {
|
||||||
smaDebug("%s:%d rsma persistence not start since inactive", __func__, __LINE__);
|
smaDebug("rsma persistence not start since inactive");
|
||||||
} break;
|
} break;
|
||||||
case TASK_TRIGGER_STAT_INIT: {
|
case TASK_TRIGGER_STAT_INIT: {
|
||||||
smaDebug("%s:%d rsma persistence not start since init", __func__, __LINE__);
|
smaDebug("rsma persistence not start since init");
|
||||||
} break;
|
} break;
|
||||||
default: {
|
default: {
|
||||||
smaWarn("%s:%d rsma persistence not start since unknown stat %" PRIi8, __func__, __LINE__, tmrStat);
|
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
|
||||||
|
ASSERT(0);
|
||||||
} break;
|
} break;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -2886,6 +2886,9 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
||||||
*/
|
*/
|
||||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
||||||
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
|
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
|
||||||
|
if(!pCur) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
tb_uid_t id = metaStbCursorNext(pCur);
|
tb_uid_t id = metaStbCursorNext(pCur);
|
||||||
|
|
|
@ -172,9 +172,9 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
vnodeSyncClose(pVnode);
|
vnodeSyncClose(pVnode);
|
||||||
vnodeQueryClose(pVnode);
|
vnodeQueryClose(pVnode);
|
||||||
walClose(pVnode->pWal);
|
walClose(pVnode->pWal);
|
||||||
smaCloseEx(pVnode->pSma);
|
|
||||||
tqClose(pVnode->pTq);
|
tqClose(pVnode->pTq);
|
||||||
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
||||||
|
smaCloseEx(pVnode->pSma);
|
||||||
metaClose(pVnode->pMeta);
|
metaClose(pVnode->pMeta);
|
||||||
vnodeCloseBufPool(pVnode);
|
vnodeCloseBufPool(pVnode);
|
||||||
// destroy handle
|
// destroy handle
|
||||||
|
|
|
@ -811,7 +811,7 @@ int32_t getMaximumIdleDurationSec();
|
||||||
* nOptrWithVal: *nOptrWithVal save the number of optr with value
|
* nOptrWithVal: *nOptrWithVal save the number of optr with value
|
||||||
* return: result code, 0 means success
|
* return: result code, 0 means success
|
||||||
*/
|
*/
|
||||||
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length);
|
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length, int32_t *nOptrWithVal);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ops: root operator, created by caller
|
* ops: root operator, created by caller
|
||||||
|
|
|
@ -222,7 +222,13 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
return encodeOperator(pTaskInfo->pRoot, pOutput, len);
|
int32_t nOptrWithVal = 0;
|
||||||
|
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
||||||
|
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
|
||||||
|
taosMemoryFreeClear(*pOutput);
|
||||||
|
*len = 0;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
|
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
|
||||||
|
|
|
@ -4472,12 +4472,12 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
|
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
|
||||||
int32_t code = TDB_CODE_SUCCESS;
|
int32_t code = TDB_CODE_SUCCESS;
|
||||||
char* pCurrent = NULL;
|
char* pCurrent = NULL;
|
||||||
int32_t currLength = 0;
|
int32_t currLength = 0;
|
||||||
if (ops->fpSet.encodeResultRow) {
|
if (ops->fpSet.encodeResultRow) {
|
||||||
if (result == NULL || length == NULL) {
|
if (result == NULL || length == NULL || nOptrWithVal == NULL) {
|
||||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
|
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
|
||||||
|
@ -4488,8 +4488,13 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
|
||||||
*result = NULL;
|
*result = NULL;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
} else if (currLength == 0) {
|
||||||
|
ASSERT(!pCurrent);
|
||||||
|
goto _downstream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
++(*nOptrWithVal);
|
||||||
|
|
||||||
ASSERT(currLength >= 0);
|
ASSERT(currLength >= 0);
|
||||||
|
|
||||||
if (*result == NULL) {
|
if (*result == NULL) {
|
||||||
|
@ -4516,8 +4521,10 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) {
|
||||||
taosMemoryFree(pCurrent);
|
taosMemoryFree(pCurrent);
|
||||||
*length = *(int32_t*)(*result);
|
*length = *(int32_t*)(*result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_downstream:
|
||||||
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
|
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
|
||||||
code = encodeOperator(ops->pDownstream[i], result, length);
|
code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
|
||||||
if (code != TDB_CODE_SUCCESS) {
|
if (code != TDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue