Merge pull request #28628 from taosdata/feat/contrib30

enh: set parent environ to udfd
This commit is contained in:
Shengliang Guan 2024-11-04 14:32:58 +08:00 committed by GitHub
commit ac5d09baa9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 169 additions and 108 deletions

View File

@ -40,7 +40,7 @@ typedef struct SUdfdData {
#ifdef WINDOWS
HANDLE jobHandle;
#endif
int spawnErr;
int32_t spawnErr;
uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
int32_t stopCalled;
@ -51,15 +51,17 @@ typedef struct SUdfdData {
SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
void udfStopUdfd();
void udfStopUdfd();
extern char **environ;
static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
static void udfUdfdStopAsyncCb(uv_async_t *async);
static void udfWatchUdfd(void *args);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
SUdfdData *pData = process->data;
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
@ -67,7 +69,7 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
} else {
fnInfo("udfd process restart");
int32_t code = udfSpawnUdfd(pData);
if(code != 0) {
if (code != 0) {
fnError("udfd process restart failed with code:%d", code);
}
}
@ -75,6 +77,8 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
static int32_t udfSpawnUdfd(SUdfdData *pData) {
fnInfo("start to init udfd");
int32_t err = 0;
uv_process_options_t options = {0};
char path[PATH_MAX] = {0};
@ -126,17 +130,17 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
float numCpuCores = 4;
float numCpuCores = 4;
int32_t code = taosGetCpuCores(&numCpuCores, false);
if(code != 0) {
fnError("failed to get cpu cores, code:%d", code);
if (code != 0) {
fnError("failed to get cpu cores, code:0x%x", code);
}
numCpuCores = TMAX(numCpuCores, 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
@ -158,8 +162,8 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char *taosFqdnEnvItem = NULL;
char *taosFqdn = getenv("TAOS_FQDN");
if (taosFqdn != NULL) {
int subLen = strlen(taosFqdn);
int len = strlen("TAOS_FQDN=") + subLen + 1;
int32_t subLen = strlen(taosFqdn);
int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
taosFqdnEnvItem = taosMemoryMalloc(len);
if (taosFqdnEnvItem != NULL) {
tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
@ -171,11 +175,53 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
}
}
char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem,taosFqdnEnvItem, NULL};
char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
options.env = envUdfd;
char **envUdfdWithPEnv = NULL;
if (environ != NULL) {
int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
int32_t numEnviron = 0;
while (environ[numEnviron] != NULL) {
numEnviron++;
}
int err = uv_spawn(&pData->loop, &pData->process, &options);
envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
if (envUdfdWithPEnv == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
for (int32_t i = 0; i < numEnviron; i++) {
int32_t len = strlen(environ[i]) + 1;
envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[i] == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tstrncpy(envUdfdWithPEnv[i], environ[i], len);
}
for (int32_t i = 0; i < lenEnvUdfd; i++) {
if (envUdfd[i] != NULL) {
int32_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[numEnviron + i] == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
}
}
envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
options.env = envUdfdWithPEnv;
} else {
options.env = envUdfd;
}
err = uv_spawn(&pData->loop, &pData->process, &options);
pData->process.data = (void *)pData;
#ifdef WINDOWS
@ -202,7 +248,21 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
} else {
fnInfo("udfd is initialized");
}
if(taosFqdnEnvItem) taosMemoryFree(taosFqdnEnvItem);
_OVER:
if (taosFqdnEnvItem) {
taosMemoryFree(taosFqdnEnvItem);
}
if (envUdfdWithPEnv != NULL) {
int32_t i = 0;
while (envUdfdWithPEnv[i] != NULL) {
taosMemoryFree(envUdfdWithPEnv[i]);
i++;
}
taosMemoryFree(envUdfdWithPEnv);
}
return err;
}
@ -225,13 +285,13 @@ static void udfWatchUdfd(void *args) {
TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
atomic_store_32(&pData->spawnErr, 0);
(void)uv_barrier_wait(&pData->barrier);
int num = uv_run(&pData->loop, UV_RUN_DEFAULT);
int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
num = uv_run(&pData->loop, UV_RUN_DEFAULT);
fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
if(uv_loop_close(&pData->loop) != 0) {
if (uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__);
}
return;
@ -240,7 +300,7 @@ _exit:
if (terrno != 0) {
(void)uv_barrier_wait(&pData->barrier);
atomic_store_32(&pData->spawnErr, terrno);
if(uv_loop_close(&pData->loop) != 0) {
if (uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__);
}
fnError("udfd thread exit with code:%d lino:%d", terrno, terrln);
@ -271,10 +331,10 @@ int32_t udfStartUdfd(int32_t startDnodeId) {
int32_t err = atomic_load_32(&pData->spawnErr);
if (err != 0) {
uv_barrier_destroy(&pData->barrier);
if(uv_async_send(&pData->stopAsync) != 0) {
if (uv_async_send(&pData->stopAsync) != 0) {
fnError("start udfd: failed to send stop async");
}
if(uv_thread_join(&pData->thread)!= 0) {
if (uv_thread_join(&pData->thread) != 0) {
fnError("start udfd: failed to join udfd thread");
}
pData->needCleanUp = false;
@ -299,10 +359,10 @@ void udfStopUdfd() {
atomic_store_32(&pData->stopCalled, 1);
pData->needCleanUp = false;
uv_barrier_destroy(&pData->barrier);
if(uv_async_send(&pData->stopAsync) != 0) {
if (uv_async_send(&pData->stopAsync) != 0) {
fnError("stop udfd: failed to send stop async");
}
if(uv_thread_join(&pData->thread) != 0) {
if (uv_thread_join(&pData->thread) != 0) {
fnError("stop udfd: failed to join udfd thread");
}
@ -341,7 +401,7 @@ typedef void *QUEUE[2];
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr)-offsetof(type, field)))
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
/* Important note: mutating the list while QUEUE_FOREACH is
* iterating over its elements results in undefined behavior.
@ -434,8 +494,8 @@ typedef struct SUdfcProxy {
QUEUE uvProcTaskQueue;
uv_mutex_t udfStubsMutex;
SArray *udfStubs; // SUdfcFuncStub
SArray *expiredUdfStubs; //SUdfcFuncStub
SArray *udfStubs; // SUdfcFuncStub
SArray *expiredUdfStubs; // SUdfcFuncStub
uv_mutex_t udfcUvMutex;
int8_t initialized;
@ -458,7 +518,7 @@ typedef struct SUdfcUvSession {
typedef struct SClientUvTaskNode {
SUdfcProxy *udfc;
int8_t type;
int errCode;
int32_t errCode;
uv_pipe_t *pipe;
@ -516,7 +576,7 @@ enum {
UDFC_STATE_STOPPING, // stopping after udfcClose
};
void getUdfdPipeName(char *pipeName, int32_t size);
void getUdfdPipeName(char *pipeName, int32_t size);
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
@ -801,12 +861,12 @@ void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
break;
default:
rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
fnError("decode udf response, invalid udf response type %d", rsp->type);
break;
}
if(buf == NULL) {
rsp->code = terrno;
if (buf == NULL) {
rsp->code = terrno;
fnError("decode udf response failed, code:0x%x", rsp->code);
}
return (void *)buf;
@ -847,12 +907,12 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfBlock->numOfRows = block->info.rows;
udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
if((udfBlock->udfCols) == NULL) {
if ((udfBlock->udfCols) == NULL) {
return terrno;
}
for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
if(udfBlock->udfCols[i] == NULL) {
if (udfBlock->udfCols[i] == NULL) {
return terrno;
}
SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
@ -866,18 +926,18 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
if(udfCol->colData.varLenCol.varOffsets == NULL) {
if (udfCol->colData.varLenCol.varOffsets == NULL) {
return terrno;
}
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
if(udfCol->colData.varLenCol.payload == NULL) {
if (udfCol->colData.varLenCol.payload == NULL) {
return terrno;
}
if (col->reassigned) {
for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
char* pColData = col->pData + col->varmeta.offset[row];
char *pColData = col->pData + col->varmeta.offset[row];
int32_t colSize = 0;
if (col->info.type == TSDB_DATA_TYPE_JSON) {
colSize = getJsonValueLen(pColData);
@ -894,7 +954,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
if(udfCol->colData.fixLenCol.nullBitmap == NULL) {
if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
return terrno;
}
char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
@ -927,11 +987,11 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
code = bdGetColumnInfoData(block, 0, &col);
TAOS_CHECK_GOTO(code, &lino, _exit);
for (int i = 0; i < udfCol->colData.numOfRows; ++i) {
for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
if (udfColDataIsNull(udfCol, i)) {
colDataSetNULL(col, i);
} else {
char* data = udfColDataGetData(udfCol, i);
char *data = udfColDataGetData(udfCol, i);
code = colDataSetVal(col, i, data, false);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
@ -953,32 +1013,32 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
}
// create the basic block info structure
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pInfo = input[i].columnData;
SColumnInfoData d = {0};
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *pInfo = input[i].columnData;
SColumnInfoData d = {0};
d.info = pInfo->info;
TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
}
TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
SColumnInfoData* pColInfoData = input[i].columnData;
SColumnInfoData *pColInfoData = input[i].columnData;
TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
if (input[i].numOfRows < numOfRows) {
int32_t startRow = input[i].numOfRows;
int expandRows = numOfRows - startRow;
bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1);
int32_t expandRows = numOfRows - startRow;
bool isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
if (isNull) {
colDataSetNNULL(pDest, startRow, expandRows);
} else {
char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
for (int j = 0; j < expandRows; ++j) {
TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow+j, src, false), &lino, _exit);
char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
for (int32_t j = 0; j < expandRows; ++j) {
TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
}
}
}
@ -1000,7 +1060,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
output->numOfRows = input->info.rows;
output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
if(output->columnData == NULL) {
if (output->columnData == NULL) {
return terrno;
}
memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
@ -1012,11 +1072,11 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
int8_t finalResNum;
int8_t interResNum;
int32_t interResBufLen;
char *finalResBuf;
char *interResBuf;
char *finalResBuf;
char *interResBuf;
} SUdfAggRes;
void onUdfcPipeClose(uv_handle_t *handle);
@ -1026,8 +1086,8 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf);
void udfcUvHandleRsp(SClientUvConn *conn);
void udfcUvHandleError(SClientUvConn *conn);
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void onUdfcPipeWrite(uv_write_t *write, int status);
void onUdfcPipeConnect(uv_connect_t *connect, int status);
void onUdfcPipeWrite(uv_write_t *write, int32_t status);
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
@ -1037,7 +1097,7 @@ void udfStopAsyncCb(uv_async_t *async);
void constructUdfService(void *argsThread);
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
int compareUdfcFuncSub(const void *elem1, const void *elem2);
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
int32_t doTeardownUdf(UdfcFuncHandle handle);
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
@ -1062,9 +1122,9 @@ int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
void cleanupNotExpiredUdfs();
void cleanupExpiredUdfs();
int compareUdfcFuncSub(const void *elem1, const void *elem2) {
void cleanupNotExpiredUdfs();
void cleanupExpiredUdfs();
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
@ -1150,21 +1210,22 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
void cleanupExpiredUdfs() {
int32_t i = 0;
SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
if(expiredUdfStubs == NULL) {
if (expiredUdfStubs == NULL) {
fnError("cleanupExpiredUdfs: failed to init array");
return;
}
while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
stub->refCount);
(void)doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle);
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
stub->udfName, stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
if(taosArrayPush(expiredUdfStubs, stub) == NULL) {
if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
fnError("cleanupExpiredUdfs: failed to push udf stub to array");
}
} else {
@ -1347,7 +1408,8 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
return code;
}
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {
.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
@ -1391,8 +1453,9 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
int32_t udfCallCode = 0;
SUdfInterBuf state = {
.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
int32_t udfCallCode = 0;
udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
@ -1448,7 +1511,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
SUdfResponse rsp = {0};
void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
code = rsp.code;
if(code != 0) {
if (code != 0) {
fnError("udfc get udf task result failure. code: %d", code);
}
@ -1474,18 +1537,18 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
taosMemoryFree(uvTask->rspBuf.base);
} else {
code = uvTask->errCode;
if(code != 0) {
if (code != 0) {
fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
}
}
} else if (uvTask->type == UV_TASK_CONNECT) {
code = uvTask->errCode;
if(code != 0) {
if (code != 0) {
fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
}
} else if (uvTask->type == UV_TASK_DISCONNECT) {
code = uvTask->errCode;
if(code != 0) {
if (code != 0) {
fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
}
}
@ -1620,7 +1683,7 @@ void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
}
void onUdfcPipeWrite(uv_write_t *write, int status) {
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
SClientUvConn *conn = write->data;
if (status < 0) {
fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
@ -1631,7 +1694,7 @@ void onUdfcPipeWrite(uv_write_t *write, int status) {
taosMemoryFree(write);
}
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
SClientUvTaskNode *uvTask = connect->data;
if (status != 0) {
fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status));
@ -1639,7 +1702,7 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) {
uvTask->errCode = status;
int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
if(code != 0) {
if (code != 0) {
fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
uvTask->errCode = code;
}
@ -1678,13 +1741,12 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT
}
request.msgLen = bufLen;
void *bufBegin = taosMemoryMalloc(bufLen);
if(bufBegin == NULL) {
if (bufBegin == NULL) {
fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
return terrno;
}
void *buf = bufBegin;
if(encodeUdfRequest(&buf, &request) <= 0)
{
if (encodeUdfRequest(&buf, &request) <= 0) {
fnError("udfc create uv task, encode request failed. size: %d", bufLen);
taosMemoryFree(bufBegin);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
@ -1695,9 +1757,8 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT
} else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfUvPipe;
}
if (uv_sem_init(&uvTask->taskSem, 0) != 0)
{
if (uvTaskType == UV_TASK_REQ_RSP) {
if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
if (uvTaskType == UV_TASK_REQ_RSP) {
taosMemoryFree(uvTask->reqBuf.base);
}
fnError("udfc create uv task, init semaphore failed.");
@ -1733,7 +1794,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
if(pipe == NULL) {
if (pipe == NULL) {
fnError("udfc event loop start connect task malloc pipe failed.");
return terrno;
}
@ -1745,7 +1806,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
if(conn == NULL) {
if (conn == NULL) {
fnError("udfc event loop start connect task malloc conn failed.");
taosMemoryFree(pipe);
return terrno;
@ -1760,7 +1821,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
pipe->data = conn;
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
if(connReq == NULL) {
if (connReq == NULL) {
fnError("udfc event loop start connect task malloc connReq failed.");
taosMemoryFree(pipe);
taosMemoryFree(conn);
@ -1777,14 +1838,14 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
} else {
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
if(write == NULL) {
if (write == NULL) {
fnError("udfc event loop start req_rsp task malloc write failed.");
return terrno;
}
write->data = pipe->data;
QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
if (err != 0) {
taosMemoryFree(write);
fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
@ -1874,7 +1935,7 @@ void udfStopAsyncCb(uv_async_t *async) {
}
void constructUdfService(void *argsThread) {
int32_t code = 0, lino = 0;
int32_t code = 0, lino = 0;
SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
code = uv_loop_init(&udfc->uvLoop);
TAOS_CHECK_GOTO(code, &lino, _exit);
@ -1891,7 +1952,7 @@ void constructUdfService(void *argsThread) {
QUEUE_INIT(&udfc->uvProcTaskQueue);
(void)uv_barrier_wait(&udfc->initBarrier);
// TODO return value of uv_run
int num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
fnInfo("udfc uv loop exit. active handle num: %d", num);
(void)uv_loop_close(&udfc->uvLoop);
@ -1909,7 +1970,7 @@ _exit:
int32_t udfcOpen() {
int32_t code = 0, lino = 0;
int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
if (old == 1) {
return 0;
}
@ -1927,12 +1988,12 @@ int32_t udfcOpen() {
code = uv_mutex_init(&proxy->udfStubsMutex);
TAOS_CHECK_GOTO(code, &lino, _exit);
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
if(proxy->udfStubs == NULL) {
if (proxy->udfStubs == NULL) {
fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
return -1;
}
proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
if(proxy->expiredUdfStubs == NULL) {
if (proxy->expiredUdfStubs == NULL) {
taosArrayDestroy(proxy->udfStubs);
fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
return -1;
@ -1956,10 +2017,10 @@ int32_t udfcClose() {
SUdfcProxy *udfc = &gUdfcProxy;
udfc->udfcState = UDFC_STATE_STOPPING;
if(uv_async_send(&udfc->loopStopAsync) != 0) {
if (uv_async_send(&udfc->loopStopAsync) != 0) {
fnError("udfc close error to send stop async");
}
if(uv_thread_join(&udfc->loopThread) != 0 ) {
if (uv_thread_join(&udfc->loopThread) != 0) {
fnError("udfc close errir to join loop thread");
}
uv_mutex_destroy(&udfc->taskQueueMutex);
@ -1974,9 +2035,9 @@ int32_t udfcClose() {
}
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
int32_t code = 0, lino = 0;
int32_t code = 0, lino = 0;
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
if(uvTask == NULL) {
if (uvTask == NULL) {
fnError("udfc client task: %p failed to allocate memory for uvTask", task);
return terrno;
}
@ -2006,14 +2067,14 @@ _exit:
}
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
if(task == NULL) {
if (task == NULL) {
fnError("doSetupUdf, failed to allocate memory for task");
return terrno;
}
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
if(task->session == NULL) {
if (task->session == NULL) {
fnError("doSetupUdf, failed to allocate memory for session");
taosMemoryFree(task);
return terrno;
@ -2059,7 +2120,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
return TSDB_CODE_UDF_PIPE_NOT_EXIST;
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
if(task == NULL) {
if (task == NULL) {
fnError("udfc call udf. failed to allocate memory for task");
return terrno;
}
@ -2163,8 +2224,8 @@ int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdf
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
SSDataBlock inputBlock = {0};
int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
if(code != 0) {
int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
if (code != 0) {
fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
return code;
}
@ -2174,13 +2235,13 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
err = convertDataBlockToScalarParm(&resultBlock, output);
taosArrayDestroy(resultBlock.pDataBlock);
}
blockDataFreeRes(&inputBlock);
return err;
}
int32_t doTeardownUdf(UdfcFuncHandle handle) {
int32_t code = TSDB_CODE_SUCCESS, lino = 0;;
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
SUdfcUvSession *session = (SUdfcUvSession *)handle;
if (session->udfUvPipe == NULL) {
@ -2190,7 +2251,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
if(task == NULL) {
if (task == NULL) {
fnError("doTeardownUdf, failed to allocate memory for task");
taosMemoryFree(session);
return terrno;