refact: tsma/rsma process

This commit is contained in:
kailixu 2022-12-08 14:16:57 +08:00
parent 74a253eac4
commit 096e7054f0
2 changed files with 15 additions and 8 deletions

View File

@ -750,12 +750,14 @@ _err:
*/
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid) {
void *qItem = taosAllocateQitem(len, DEF_QITEM);
int32_t size = sizeof(int32_t) + len;
void *qItem = taosAllocateQitem(size, DEF_QITEM);
if (!qItem) {
return TSDB_CODE_FAILED;
}
memcpy(qItem, pMsg, len);
*(int32_t *)qItem = len;
memcpy(POINTER_SHIFT(qItem, sizeof(int32_t)), pMsg, len);
taosWriteQitem(pInfo->queue, qItem);
@ -1367,7 +1369,8 @@ _end:
static void tdFreeRSmaSubmitItems(SArray *pItems) {
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
SPackedData *packData = taosArrayGet(pItems, i);
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t)));
}
taosArrayClear(pItems);
}
@ -1436,7 +1439,8 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
void *msg = NULL;
taosGetQitem(qall, (void **)&msg);
if (msg) {
if (!taosArrayPush(pSubmitArr, &msg)) {
SPackedData packData = {.msgLen = *(int32_t *)msg, .msgStr = POINTER_SHIFT(msg, sizeof(int32_t))};
if (!taosArrayPush(pSubmitArr, &packData)) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
@ -1491,7 +1495,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
}
if (!(pSubmitArr =
taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) {
taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}

View File

@ -51,8 +51,8 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
if (type == STREAM_INPUT__MERGED_SUBMIT) {
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq);
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
@ -61,7 +61,10 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
SPackedData tmp = {
.pDataBlock = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp);
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
}