fix: sanitize udf memory usage

This commit is contained in:
slzhou 2022-05-17 11:13:11 +08:00
parent 47f0c6db6a
commit c94c43951e
2 changed files with 16 additions and 7 deletions

View File

@ -795,7 +795,6 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
} }
output->info.hasVarCol = hasVarCol; output->info.hasVarCol = hasVarCol;
//TODO: free the array output->pDataBlock
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
taosArrayPush(output->pDataBlock, (input + i)->columnData); taosArrayPush(output->pDataBlock, (input + i)->columnData);
@ -809,8 +808,12 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
return -1; return -1;
} }
output->numOfRows = input->info.rows; output->numOfRows = input->info.rows;
//TODO: memory
output->columnData = taosArrayGet(input->pDataBlock, 0); output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
memcpy(output->columnData,
taosArrayGet(input->pDataBlock, 0),
sizeof(SColumnInfoData));
return 0; return 0;
} }
@ -1427,7 +1430,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
if (err == 0) { if (err == 0) {
convertDataBlockToScalarParm(&resultBlock, output); convertDataBlockToScalarParm(&resultBlock, output);
taosArrayDestroy(resultBlock.pDataBlock);
} }
taosArrayDestroy(inputBlock.pDataBlock);
return err; return err;
} }
@ -1508,16 +1514,15 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
int32_t err = task->errCode; int32_t err = task->errCode;
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
taosMemoryFree(task->session); taosMemoryFree(task->session);
taosMemoryFree(task); taosMemoryFree(task);
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
return err; return err;
} }

View File

@ -77,11 +77,15 @@ int main(int argc, char *argv[]) {
input.columnData = taosArrayGet(pBlock->pDataBlock, 0); input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0}; SScalarParam output = {0};
doCallUdfScalarFunc(handle, &input, 1, &output); doCallUdfScalarFunc(handle, &input, 1, &output);
taosArrayDestroy(pBlock->pDataBlock);
SColumnInfoData *col = output.columnData; SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) { for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
} }
colDataDestroy(output.columnData);
taosMemoryFree(output.columnData);
doTeardownUdf(handle); doTeardownUdf(handle);
udfcClose(); udfcClose();
} }