enh: support delete msg for rsma
This commit is contained in:
parent
9cf154543f
commit
572eb691fb
|
@ -160,7 +160,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq);
|
|||
int32_t tqStopStreamTasks(STQ* pTq);
|
||||
|
||||
// tq util
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type);
|
||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||
int32_t type, int64_t sver, int64_t ever);
|
||||
|
|
|
@ -286,8 +286,8 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
|||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
|
||||
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
|
||||
int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len);
|
||||
int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len);
|
||||
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
||||
|
|
|
@ -217,10 +217,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
|
|||
int32_t lino = 0;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pSmaEnv) {
|
||||
goto _exit;
|
||||
}
|
||||
if (!SMA_RSMA_ENV(pSma)) goto _exit;
|
||||
|
||||
code = tsdbCommitBegin(VND_RSMA1(pVnode), pInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -43,7 +43,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz
|
|||
ERsmaExecType type, int8_t level);
|
||||
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems);
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
|
||||
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
||||
int32_t execType, int8_t *streamFlushed);
|
||||
|
@ -723,7 +723,7 @@ _exit:
|
|||
*/
|
||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
|
||||
SRSmaInfo *pInfo, tb_uid_t suid) {
|
||||
int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header(type+len+version) + payload
|
||||
int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header + payload
|
||||
void *qItem = taosAllocateQitem(size, DEF_QITEM, 0);
|
||||
|
||||
if (!qItem) {
|
||||
|
@ -940,12 +940,8 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pEnv) {
|
||||
// only applicable when rsma env exists
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
|
||||
if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
|
||||
|
||||
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
|
||||
smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr());
|
||||
|
@ -954,27 +950,25 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
|
|||
|
||||
STbUidStore uidStore = {0};
|
||||
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
|
||||
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (uidStore.suid != 0) {
|
||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (uidStore.suid != 0) {
|
||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, uidStore.suid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||
void *pIter = NULL;
|
||||
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
|
||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
|
||||
taosHashCancelIterate(uidStore.uidHash, pIter);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
|
||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
|
||||
taosHashCancelIterate(uidStore.uidHash, pIter);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tdUidStoreDestory(&uidStore);
|
||||
|
@ -984,19 +978,18 @@ _err:
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pEnv) {
|
||||
// only applicable when rsma env exists
|
||||
return TSDB_CODE_SUCCESS;
|
||||
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
|
||||
if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
|
||||
|
||||
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
|
||||
smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
SDeleteRes *pReq = pReq;
|
||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, pReq->suid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
SDeleteRes *pDelRes = pReq;
|
||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
|
@ -1381,10 +1374,20 @@ _end:
|
|||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
||||
}
|
||||
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
||||
SPackedData *packData = taosArrayGet(pItems, i);
|
||||
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN));
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
|
||||
int32_t arrSize = taosArrayGetSize(pItems);
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
|
||||
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN));
|
||||
}
|
||||
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
|
||||
blockDataDestroy(packData->pDataBlock);
|
||||
}
|
||||
} else {
|
||||
ASSERTS(0, "unknown type:%d", type);
|
||||
}
|
||||
taosArrayClear(pItems);
|
||||
}
|
||||
|
@ -1448,34 +1451,37 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#define RSMA_SUBMIT_MSG_TYPE(msg) (*(int8_t *)(msg))
|
||||
#define RSMA_SUBMIT_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
|
||||
#define RSMA_SUBMIT_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
|
||||
#define RSMA_SUBMIT_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t)))
|
||||
|
||||
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
|
||||
|
||||
void *msg = NULL;
|
||||
int8_t resume = 0;
|
||||
int32_t nSubmit = 0;
|
||||
int32_t nDelete = 0;
|
||||
void *msg = NULL;
|
||||
int8_t resume = 0;
|
||||
int32_t nSubmit = 0;
|
||||
int32_t nDelete = 0;
|
||||
|
||||
SPackedData packData;
|
||||
|
||||
taosArrayClear(pSubmitArr);
|
||||
|
||||
// the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
|
||||
while (1) {
|
||||
taosGetQitem(qall, (void **)&msg);
|
||||
if (msg) {
|
||||
int8_t inputType = *(int8_t *)msg;
|
||||
|
||||
msg = POINTER_SHIFT(msg, sizeof(int8_t));
|
||||
|
||||
int8_t inputType = RSMA_SUBMIT_MSG_TYPE(msg);
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (nDelete > 0) {
|
||||
resume = 1;
|
||||
break;
|
||||
}
|
||||
_resume_submit:
|
||||
packData.msgLen = *(int32_t *)msg;
|
||||
packData.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t));
|
||||
packData.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t));
|
||||
packData.msgLen = RSMA_SUBMIT_MSG_LEN(msg);
|
||||
packData.ver = RSMA_SUBMIT_MSG_VER(msg);
|
||||
packData.msgStr = RSMA_SUBMIT_MSG_BODY(msg);
|
||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||
taosFreeQitem(msg);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
@ -1486,46 +1492,49 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
|||
break;
|
||||
}
|
||||
_resume_delete:
|
||||
++nDelete;
|
||||
#if 0
|
||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (nSubmit > 0) {
|
||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||
if (size > 0) {
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
|
||||
extractDelDataBlock(RSMA_SUBMIT_MSG_BODY(msg), RSMA_SUBMIT_MSG_LEN(msg), RSMA_SUBMIT_MSG_VER(msg),
|
||||
&packData.pDataBlock, 1);
|
||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||
taosFreeQitem(msg);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
taosFreeQitem(msg);
|
||||
++nDelete;
|
||||
} else {
|
||||
ASSERTS(0, "unknown msg type:%d", inputType);
|
||||
break;
|
||||
}
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
}
|
||||
} else if (nDelete > 0) {
|
||||
}
|
||||
|
||||
if (resume == 0) {
|
||||
goto _rtn;
|
||||
} else if (resume == 1) {
|
||||
nSubmit = 0;
|
||||
nDelete = 0;
|
||||
resume = 0;
|
||||
taosArrayClear(pSubmitArr);
|
||||
goto _resume_submit;
|
||||
} else {
|
||||
nSubmit = 0;
|
||||
nDelete = 0;
|
||||
resume = 0;
|
||||
taosArrayClear(pSubmitArr);
|
||||
goto _resume_delete;
|
||||
if (nSubmit > 0 || nDelete > 0) {
|
||||
int32_t size = TARRAY_SIZE(pSubmitArr);
|
||||
if (size > 0) {
|
||||
int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
tdFreeRSmaSubmitItems(pSubmitArr, inputType);
|
||||
}
|
||||
}
|
||||
|
||||
if (resume == 0) {
|
||||
goto _rtn;
|
||||
} else if (resume == 1) {
|
||||
nSubmit = 0;
|
||||
nDelete = 0;
|
||||
resume = 0;
|
||||
tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK);
|
||||
goto _resume_submit;
|
||||
} else {
|
||||
nSubmit = 0;
|
||||
nDelete = 0;
|
||||
resume = 0;
|
||||
tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__MERGED_SUBMIT);
|
||||
goto _resume_delete;
|
||||
}
|
||||
}
|
||||
|
||||
_rtn:
|
||||
|
@ -1534,7 +1543,7 @@ _err:
|
|||
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
|
||||
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
|
||||
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
|
||||
while (1) {
|
||||
void *msg = NULL;
|
||||
taosGetQitem(qall, (void **)&msg);
|
||||
|
|
|
@ -343,7 +343,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||
|
||||
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||
code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (*pItem == NULL) {
|
||||
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
||||
|
|
|
@ -399,7 +399,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type) {
|
||||
SDecoder* pCoder = &(SDecoder){0};
|
||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||
|
||||
|
@ -442,14 +442,19 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
|
|||
}
|
||||
|
||||
taosArrayDestroy(pRes->uidList);
|
||||
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
if (*pRefBlock == NULL) {
|
||||
blockDataCleanup(pDelBlock);
|
||||
taosMemoryFree(pDelBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (type == 0) {
|
||||
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
if (*pRefBlock == NULL) {
|
||||
blockDataCleanup(pDelBlock);
|
||||
taosMemoryFree(pDelBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||
((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
|
||||
} else {
|
||||
*pRefBlock = pDelBlock;
|
||||
}
|
||||
|
||||
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||
(*pRefBlock)->pBlock = pDelBlock;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -380,7 +380,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
SEncoder *pCoder = &(SEncoder){0};
|
||||
SDeleteRes res = {0};
|
||||
|
||||
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .skipRollup = 1};
|
||||
initStorageAPI(&handle.api);
|
||||
|
||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||
|
@ -1669,7 +1669,7 @@ _exit:
|
|||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||
if (code == 0) {
|
||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
||||
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
|
||||
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len);
|
||||
}
|
||||
|
||||
// clear
|
||||
|
@ -1947,7 +1947,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len, STREAM_INPUT__REF_DATA_BLOCK);
|
||||
tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len);
|
||||
|
||||
tDecoderClear(pCoder);
|
||||
taosArrayDestroy(pRes->uidList);
|
||||
|
|
|
@ -78,12 +78,11 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
|
|||
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
||||
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||
SPackedData tmp = {.pDataBlock = pDataBlock};
|
||||
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||
taosArrayPush(pInfo->pBlockLists, pReq);
|
||||
}
|
||||
pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK;
|
||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue