data check: udf

This commit is contained in:
xsren 2024-11-11 11:04:11 +08:00
parent 1c01bd801b
commit bf9ea14f86
3 changed files with 31 additions and 8 deletions

View File

@ -83,6 +83,13 @@ extern "C" {
} \ } \
} while (0) } while (0)
#define TAOS_UDF_CHECK_CONDITION(o, code) \
do { \
if ((o) == false) { \
fnError("Condition not met.line:%d", __LINE__); \
return code; \
} \
} while (0)
// low level APIs // low level APIs
/** /**

View File

@ -1019,6 +1019,8 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
} }
block->info.rows = udfCol->colData.numOfRows; block->info.rows = udfCol->colData.numOfRows;
code = blockDataCheck(block);
TAOS_CHECK_GOTO(code, &lino, _exit);
_exit: _exit:
if (code != 0) { if (code != 0) {
fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino); fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);

View File

@ -451,10 +451,7 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
} }
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
if (plugin == NULL) { TAOS_UDF_CHECK_PTR_RVOID(plugin);
fnError("udf script c plugin is NULL");
return;
}
if (plugin->closeFunc) { if (plugin->closeFunc) {
if (plugin->closeFunc() != 0) { if (plugin->closeFunc() != 0) {
fnError("udf script c plugin close func failed.line:%d", __LINE__); fnError("udf script c plugin close func failed.line:%d", __LINE__);
@ -473,10 +470,7 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
} }
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
if (plugin == NULL) { TAOS_UDF_CHECK_PTR_RVOID(plugin);
fnError("udf script c plugin is NULL");
return;
}
if (plugin->closeFunc) { if (plugin->closeFunc) {
if (plugin->closeFunc() != 0) { if (plugin->closeFunc() != 0) {
fnError("udf script python plugin close func failed.line:%d", __LINE__); fnError("udf script python plugin close func failed.line:%d", __LINE__);
@ -548,6 +542,7 @@ void udfdDeinitScriptPlugins() {
} }
void udfdProcessRequest(uv_work_t *req) { void udfdProcessRequest(uv_work_t *req) {
TAOS_UDF_CHECK_PTR_RVOID(req);
if (req == NULL) { if (req == NULL) {
fnError("udf request is NULL"); fnError("udf request is NULL");
return; return;
@ -804,6 +799,23 @@ static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) {
fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows); fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows);
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
} }
for (int32_t i = 0; i < output->colData.numOfRows; ++i) {
if (tsSafetyCheckLevel < TSDB_SAFETY_CHECK_LEVELL_BYROW) break;
if (!udfColDataIsNull(output, i)) {
if (IS_VAR_DATA_TYPE(output->colMeta.type)) {
TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.payload != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.varOffsets[i] >= 0 &&
output->colData.varLenCol.varOffsets[i] < output->colData.varLenCol.payloadLen,
TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
} else {
TAOS_UDF_CHECK_CONDITION(
output->colMeta.bytes * output->colData.numOfRows <= output->colData.fixLenCol.dataLen,
TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
break;
}
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -815,6 +827,8 @@ static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) {
fnError("udf agg result num of rows %d not equal to 1", output->numOfResult); fnError("udf agg result num of rows %d not equal to 1", output->numOfResult);
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
} }
TAOS_UDF_CHECK_CONDITION(output->buf != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
TAOS_UDF_CHECK_CONDITION(output->bufLen > 0, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }