enh: rsma support delete raw data
This commit is contained in:
parent
82d3deb423
commit
a9d30f25d2
|
@ -278,6 +278,7 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
||||||
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||||
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
|
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
|
||||||
|
int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
|
||||||
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
||||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
||||||
|
|
|
@ -669,7 +669,7 @@ _exit:
|
||||||
*/
|
*/
|
||||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
|
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
|
||||||
SRSmaInfo *pInfo, tb_uid_t suid) {
|
SRSmaInfo *pInfo, tb_uid_t suid) {
|
||||||
int32_t size = sizeof(int32_t) + sizeof(int64_t) + len;
|
int32_t size = sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t) + len; // type + len + version + payload
|
||||||
void *qItem = taosAllocateQitem(size, DEF_QITEM, 0);
|
void *qItem = taosAllocateQitem(size, DEF_QITEM, 0);
|
||||||
|
|
||||||
if (!qItem) {
|
if (!qItem) {
|
||||||
|
@ -678,6 +678,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
|
||||||
|
|
||||||
void *pItem = qItem;
|
void *pItem = qItem;
|
||||||
|
|
||||||
|
*(int8_t *)pItem = (int8_t)inputType;
|
||||||
|
pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
|
||||||
*(int32_t *)pItem = len;
|
*(int32_t *)pItem = len;
|
||||||
pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
|
pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
|
||||||
*(int64_t *)pItem = version;
|
*(int64_t *)pItem = version;
|
||||||
|
@ -852,7 +854,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -918,6 +920,25 @@ _err:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
|
||||||
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pEnv) {
|
||||||
|
// only applicable when rsma env exists
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
SDeleteRes *pReq = pReq;
|
||||||
|
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, pReq->suid) < 0) {
|
||||||
|
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief retrieve rsma meta and init
|
* @brief retrieve rsma meta and init
|
||||||
*
|
*
|
||||||
|
@ -1203,7 +1224,7 @@ _end:
|
||||||
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
||||||
SPackedData *packData = taosArrayGet(pItems, i);
|
SPackedData *packData = taosArrayGet(pItems, i);
|
||||||
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t) - sizeof(int64_t)));
|
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int8_t) - sizeof(int32_t) - sizeof(int64_t)));
|
||||||
}
|
}
|
||||||
taosArrayClear(pItems);
|
taosArrayClear(pItems);
|
||||||
}
|
}
|
||||||
|
@ -1267,24 +1288,52 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
|
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
|
||||||
taosArrayClear(pSubmitArr);
|
|
||||||
while (1) {
|
|
||||||
void *msg = NULL;
|
void *msg = NULL;
|
||||||
|
int8_t resume = 0;
|
||||||
|
int32_t nSubmit = 0;
|
||||||
|
int32_t nDelete = 0;
|
||||||
|
|
||||||
|
SPackedData packData;
|
||||||
|
|
||||||
|
taosArrayClear(pSubmitArr);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
taosGetQitem(qall, (void **)&msg);
|
taosGetQitem(qall, (void **)&msg);
|
||||||
if (msg) {
|
if (msg) {
|
||||||
SPackedData packData = {.msgLen = *(int32_t *)msg,
|
int8_t inputType = *(int8_t *)msg;
|
||||||
.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)),
|
|
||||||
.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))};
|
|
||||||
|
|
||||||
|
msg = POINTER_SHIFT(msg, sizeof(int8_t));
|
||||||
|
|
||||||
|
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
if (nDelete > 0) {
|
||||||
|
resume = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_resume_submit:
|
||||||
|
packData.msgLen = *(int32_t *)msg;
|
||||||
|
packData.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t));
|
||||||
|
packData.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t));
|
||||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
++nSubmit;
|
||||||
|
} else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
if (nSubmit > 0) {
|
||||||
|
resume = 2;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_resume_delete:
|
||||||
|
++nDelete;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nSubmit > 0) {
|
||||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||||
|
@ -1294,6 +1343,26 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
||||||
}
|
}
|
||||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||||
}
|
}
|
||||||
|
} else if (nDelete > 0) {
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resume == 0) {
|
||||||
|
goto _rtn;
|
||||||
|
} else if (resume == 1) {
|
||||||
|
nSubmit = 0;
|
||||||
|
nDelete = 0;
|
||||||
|
resume = 0;
|
||||||
|
taosArrayClear(pSubmitArr);
|
||||||
|
goto _resume_submit;
|
||||||
|
} else {
|
||||||
|
nSubmit = 0;
|
||||||
|
nDelete = 0;
|
||||||
|
resume = 0;
|
||||||
|
taosArrayClear(pSubmitArr);
|
||||||
|
goto _resume_delete;
|
||||||
|
}
|
||||||
|
|
||||||
|
_rtn:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
|
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
|
||||||
|
|
|
@ -1884,6 +1884,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len, STREAM_INPUT__REF_DATA_BLOCK);
|
||||||
|
|
||||||
tDecoderClear(pCoder);
|
tDecoderClear(pCoder);
|
||||||
taosArrayDestroy(pRes->uidList);
|
taosArrayDestroy(pRes->uidList);
|
||||||
|
|
||||||
|
|
|
@ -75,6 +75,15 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
|
||||||
taosArrayPush(pInfo->pBlockLists, &tmp);
|
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
}
|
}
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||||
|
SPackedData tmp = {
|
||||||
|
.pDataBlock = pDataBlock,
|
||||||
|
};
|
||||||
|
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
|
}
|
||||||
|
pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue