Merge pull request #12216 from taosdata/feature/udf
feature(udf):refactor code and fix bugs
This commit is contained in:
commit
a2e111f2f8
|
@ -44,7 +44,8 @@ enum {
|
||||||
UDFC_CODE_PIPE_READ_ERR = -2,
|
UDFC_CODE_PIPE_READ_ERR = -2,
|
||||||
UDFC_CODE_CONNECT_PIPE_ERR = -3,
|
UDFC_CODE_CONNECT_PIPE_ERR = -3,
|
||||||
UDFC_CODE_LOAD_UDF_FAILURE = -4,
|
UDFC_CODE_LOAD_UDF_FAILURE = -4,
|
||||||
UDFC_CODE_INVALID_STATE = -5
|
UDFC_CODE_INVALID_STATE = -5,
|
||||||
|
UDFC_CODE_NO_PIPE = -6,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef void *UdfcFuncHandle;
|
typedef void *UdfcFuncHandle;
|
||||||
|
@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)();
|
||||||
|
|
||||||
#define UDF_MEMORY_EXP_GROWTH 1.5
|
#define UDF_MEMORY_EXP_GROWTH 1.5
|
||||||
|
|
||||||
|
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
|
||||||
|
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
|
||||||
|
#define udfColDataSetNull_f(pColumn, row) \
|
||||||
|
do { \
|
||||||
|
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define udfColDataSetNotNull_f(pColumn, r_) \
|
||||||
|
do { \
|
||||||
|
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
|
||||||
|
} while (0)
|
||||||
|
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
|
||||||
|
|
||||||
|
|
||||||
|
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
|
||||||
|
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
|
||||||
|
} else {
|
||||||
|
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
|
||||||
|
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
if (udfColDataIsNull_var(pColumn, row)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
char* data = udfColDataGetData(pColumn, row);
|
||||||
|
return (*data == TSDB_DATA_TYPE_NULL);
|
||||||
|
} else {
|
||||||
|
return udfColDataIsNull_var(pColumn, row);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return udfColDataIsNull_f(pColumn, row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
|
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
|
||||||
SUdfColumnMeta *meta = &pColumn->colMeta;
|
SUdfColumnMeta *meta = &pColumn->colMeta;
|
||||||
SUdfColumnData *data = &pColumn->colData;
|
SUdfColumnData *data = &pColumn->colData;
|
||||||
|
@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
|
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
|
||||||
|
udfColEnsureCapacity(pColumn, row+1);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
|
||||||
|
udfColDataSetNull_var(pColumn, row);
|
||||||
|
} else {
|
||||||
|
udfColDataSetNull_f(pColumn, row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
|
||||||
SUdfColumnMeta *meta = &pColumn->colMeta;
|
SUdfColumnMeta *meta = &pColumn->colMeta;
|
||||||
SUdfColumnData *data = &pColumn->colData;
|
SUdfColumnData *data = &pColumn->colData;
|
||||||
udfColEnsureCapacity(pColumn, currentRow+1);
|
udfColEnsureCapacity(pColumn, currentRow+1);
|
||||||
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
|
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
if (isVarCol) {
|
udfColDataSetNull(pColumn, currentRow);
|
||||||
data->varLenCol.varOffsets[currentRow] = -1;
|
|
||||||
} else {
|
|
||||||
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if (!isVarCol) {
|
if (!isVarCol) {
|
||||||
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
|
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
|
||||||
|
|
|
@ -146,15 +146,15 @@ typedef struct SUdfdProxy {
|
||||||
|
|
||||||
SUdfdProxy gUdfdProxy = {0};
|
SUdfdProxy gUdfdProxy = {0};
|
||||||
|
|
||||||
typedef struct SUdfUvSession {
|
typedef struct SClientUdfUvSession {
|
||||||
SUdfdProxy *udfc;
|
SUdfdProxy *udfc;
|
||||||
int64_t severHandle;
|
int64_t severHandle;
|
||||||
uv_pipe_t *udfSvcPipe;
|
uv_pipe_t *udfUvPipe;
|
||||||
|
|
||||||
int8_t outputType;
|
int8_t outputType;
|
||||||
int32_t outputLen;
|
int32_t outputLen;
|
||||||
int32_t bufSize;
|
int32_t bufSize;
|
||||||
} SUdfUvSession;
|
} SClientUdfUvSession;
|
||||||
|
|
||||||
typedef struct SClientUvTaskNode {
|
typedef struct SClientUvTaskNode {
|
||||||
SUdfdProxy *udfc;
|
SUdfdProxy *udfc;
|
||||||
|
@ -177,7 +177,7 @@ typedef struct SClientUvTaskNode {
|
||||||
typedef struct SClientUdfTask {
|
typedef struct SClientUdfTask {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
|
||||||
SUdfUvSession *session;
|
SClientUdfUvSession *session;
|
||||||
|
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
|
|
||||||
|
@ -209,6 +209,7 @@ typedef struct SClientUvConn {
|
||||||
uv_pipe_t *pipe;
|
uv_pipe_t *pipe;
|
||||||
QUEUE taskQueue;
|
QUEUE taskQueue;
|
||||||
SClientConnBuf readBuf;
|
SClientConnBuf readBuf;
|
||||||
|
SClientUdfUvSession *session;
|
||||||
} SClientUvConn;
|
} SClientUvConn;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -617,18 +618,17 @@ void onUdfcPipeClose(uv_handle_t *handle) {
|
||||||
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
|
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
|
||||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
||||||
task->errCode = 0;
|
task->errCode = 0;
|
||||||
uv_sem_post(&task->taskSem);
|
|
||||||
QUEUE_REMOVE(&task->procTaskQueue);
|
QUEUE_REMOVE(&task->procTaskQueue);
|
||||||
|
uv_sem_post(&task->taskSem);
|
||||||
}
|
}
|
||||||
|
conn->session->udfUvPipe = NULL;
|
||||||
taosMemoryFree(conn->readBuf.buf);
|
taosMemoryFree(conn->readBuf.buf);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
taosMemoryFree((uv_pipe_t *) handle);
|
taosMemoryFree((uv_pipe_t *) handle);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
|
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
|
||||||
fnDebug("udfc get uv task result. task: %p", task);
|
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
|
||||||
if (uvTask->type == UV_TASK_REQ_RSP) {
|
if (uvTask->type == UV_TASK_REQ_RSP) {
|
||||||
if (uvTask->rspBuf.base != NULL) {
|
if (uvTask->rspBuf.base != NULL) {
|
||||||
SUdfResponse rsp;
|
SUdfResponse rsp;
|
||||||
|
@ -748,8 +748,8 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
|
||||||
if (taskFound) {
|
if (taskFound) {
|
||||||
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
|
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
|
||||||
QUEUE_REMOVE(&taskFound->connTaskQueue);
|
QUEUE_REMOVE(&taskFound->connTaskQueue);
|
||||||
uv_sem_post(&taskFound->taskSem);
|
|
||||||
QUEUE_REMOVE(&taskFound->procTaskQueue);
|
QUEUE_REMOVE(&taskFound->procTaskQueue);
|
||||||
|
uv_sem_post(&taskFound->taskSem);
|
||||||
} else {
|
} else {
|
||||||
fnError("no task is waiting for the response.");
|
fnError("no task is waiting for the response.");
|
||||||
}
|
}
|
||||||
|
@ -764,14 +764,12 @@ void udfcUvHandleError(SClientUvConn *conn) {
|
||||||
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
|
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
|
||||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
||||||
task->errCode = UDFC_CODE_PIPE_READ_ERR;
|
task->errCode = UDFC_CODE_PIPE_READ_ERR;
|
||||||
uv_sem_post(&task->taskSem);
|
QUEUE_REMOVE(&task->connTaskQueue);
|
||||||
QUEUE_REMOVE(&task->procTaskQueue);
|
QUEUE_REMOVE(&task->procTaskQueue);
|
||||||
|
uv_sem_post(&task->taskSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_close((uv_handle_t *) conn->pipe, NULL);
|
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
|
||||||
taosMemoryFree(conn->pipe);
|
|
||||||
taosMemoryFree(conn->readBuf.buf);
|
|
||||||
taosMemoryFree(conn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||||
|
@ -788,9 +786,9 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||||
|
|
||||||
}
|
}
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
fnError("udfc client pipe %p read error: %s", client, uv_strerror(nread));
|
fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
|
||||||
if (nread == UV_EOF) {
|
if (nread == UV_EOF) {
|
||||||
fnError("udfc client pipe %p closed", client);
|
fnError("\tudfc client pipe %p closed", client);
|
||||||
}
|
}
|
||||||
udfcUvHandleError(conn);
|
udfcUvHandleError(conn);
|
||||||
}
|
}
|
||||||
|
@ -823,14 +821,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
|
||||||
QUEUE_REMOVE(&uvTask->procTaskQueue);
|
QUEUE_REMOVE(&uvTask->procTaskQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
|
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
|
||||||
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
|
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
|
||||||
uvTask->type = uvTaskType;
|
uvTask->type = uvTaskType;
|
||||||
uvTask->udfc = task->session->udfc;
|
uvTask->udfc = task->session->udfc;
|
||||||
|
|
||||||
if (uvTaskType == UV_TASK_CONNECT) {
|
if (uvTaskType == UV_TASK_CONNECT) {
|
||||||
} else if (uvTaskType == UV_TASK_REQ_RSP) {
|
} else if (uvTaskType == UV_TASK_REQ_RSP) {
|
||||||
uvTask->pipe = task->session->udfSvcPipe;
|
uvTask->pipe = task->session->udfUvPipe;
|
||||||
SUdfRequest request;
|
SUdfRequest request;
|
||||||
request.type = task->type;
|
request.type = task->type;
|
||||||
request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
|
request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
|
||||||
|
@ -855,7 +853,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
|
||||||
uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
|
uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
|
||||||
uvTask->seqNum = request.seqNum;
|
uvTask->seqNum = request.seqNum;
|
||||||
} else if (uvTaskType == UV_TASK_DISCONNECT) {
|
} else if (uvTaskType == UV_TASK_DISCONNECT) {
|
||||||
uvTask->pipe = task->session->udfSvcPipe;
|
uvTask->pipe = task->session->udfUvPipe;
|
||||||
}
|
}
|
||||||
uv_sem_init(&uvTask->taskSem, 0);
|
uv_sem_init(&uvTask->taskSem, 0);
|
||||||
|
|
||||||
|
@ -863,7 +861,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
|
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
|
||||||
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
|
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
|
||||||
SUdfdProxy *udfc = uvTask->udfc;
|
SUdfdProxy *udfc = uvTask->udfc;
|
||||||
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
|
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
|
||||||
|
@ -872,12 +870,13 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
|
||||||
uv_async_send(&udfc->gUdfLoopTaskAync);
|
uv_async_send(&udfc->gUdfLoopTaskAync);
|
||||||
|
|
||||||
uv_sem_wait(&uvTask->taskSem);
|
uv_sem_wait(&uvTask->taskSem);
|
||||||
|
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
|
||||||
uv_sem_destroy(&uvTask->taskSem);
|
uv_sem_destroy(&uvTask->taskSem);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
|
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
|
||||||
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
|
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
|
||||||
switch (uvTask->type) {
|
switch (uvTask->type) {
|
||||||
case UV_TASK_CONNECT: {
|
case UV_TASK_CONNECT: {
|
||||||
|
@ -885,7 +884,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
|
||||||
uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
|
uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
|
||||||
uvTask->pipe = pipe;
|
uvTask->pipe = pipe;
|
||||||
|
|
||||||
SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn));
|
SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
|
||||||
conn->pipe = pipe;
|
conn->pipe = pipe;
|
||||||
conn->readBuf.len = 0;
|
conn->readBuf.len = 0;
|
||||||
conn->readBuf.cap = 0;
|
conn->readBuf.cap = 0;
|
||||||
|
@ -933,13 +932,14 @@ void udfClientAsyncCb(uv_async_t *async) {
|
||||||
QUEUE* h = QUEUE_HEAD(&wq);
|
QUEUE* h = QUEUE_HEAD(&wq);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
|
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
|
||||||
startUvUdfTask(task);
|
udfcStartUvTask(task);
|
||||||
QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
|
QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanUpUvTasks(SUdfdProxy *udfc) {
|
void cleanUpUvTasks(SUdfdProxy *udfc) {
|
||||||
|
fnDebug("clean up uv tasks")
|
||||||
QUEUE wq;
|
QUEUE wq;
|
||||||
|
|
||||||
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
|
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
|
||||||
|
@ -956,7 +956,6 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
|
||||||
uv_sem_post(&task->taskSem);
|
uv_sem_post(&task->taskSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: deal with tasks that are waiting result.
|
|
||||||
while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
|
while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
|
||||||
QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
|
QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
@ -1027,14 +1026,16 @@ int32_t udfcClose() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
||||||
SClientUvTaskNode *uvTask = NULL;
|
SClientUvTaskNode *uvTask = NULL;
|
||||||
|
|
||||||
createUdfcUvTask(task, uvTaskType, &uvTask);
|
udfcCreateUvTask(task, uvTaskType, &uvTask);
|
||||||
queueUvUdfTask(uvTask);
|
udfcQueueUvTask(uvTask);
|
||||||
udfcGetUvTaskResponseResult(task, uvTask);
|
udfcGetUdfTaskResultFromUvTask(task, uvTask);
|
||||||
if (uvTaskType == UV_TASK_CONNECT) {
|
if (uvTaskType == UV_TASK_CONNECT) {
|
||||||
task->session->udfSvcPipe = uvTask->pipe;
|
task->session->udfUvPipe = uvTask->pipe;
|
||||||
|
SClientUvConn *conn = uvTask->pipe->data;
|
||||||
|
conn->session = task->session;
|
||||||
}
|
}
|
||||||
taosMemoryFree(uvTask);
|
taosMemoryFree(uvTask);
|
||||||
uvTask = NULL;
|
uvTask = NULL;
|
||||||
|
@ -1046,22 +1047,22 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
|
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
|
||||||
return UDFC_CODE_INVALID_STATE;
|
return UDFC_CODE_INVALID_STATE;
|
||||||
}
|
}
|
||||||
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
|
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
|
||||||
task->errCode = 0;
|
task->errCode = 0;
|
||||||
task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
|
task->session = taosMemoryCalloc(1, sizeof(SClientUdfUvSession));
|
||||||
task->session->udfc = &gUdfdProxy;
|
task->session->udfc = &gUdfdProxy;
|
||||||
task->type = UDF_TASK_SETUP;
|
task->type = UDF_TASK_SETUP;
|
||||||
|
|
||||||
SUdfSetupRequest *req = &task->_setup.req;
|
SUdfSetupRequest *req = &task->_setup.req;
|
||||||
memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
|
memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||||
|
|
||||||
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
|
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
|
||||||
if (errCode != 0) {
|
if (errCode != 0) {
|
||||||
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
|
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
|
||||||
return UDFC_CODE_CONNECT_PIPE_ERR;
|
return UDFC_CODE_CONNECT_PIPE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
udfcRunUvTask(task, UV_TASK_REQ_RSP);
|
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||||
|
|
||||||
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
||||||
task->session->severHandle = rsp->udfHandle;
|
task->session->severHandle = rsp->udfHandle;
|
||||||
|
@ -1082,10 +1083,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
||||||
SSDataBlock* output, SUdfInterBuf *newState) {
|
SSDataBlock* output, SUdfInterBuf *newState) {
|
||||||
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
||||||
|
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
|
||||||
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
|
if (session->udfUvPipe == NULL) {
|
||||||
|
fnError("No pipe to udfd");
|
||||||
|
return UDFC_CODE_NO_PIPE;
|
||||||
|
}
|
||||||
|
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
|
||||||
task->errCode = 0;
|
task->errCode = 0;
|
||||||
task->session = (SUdfUvSession *) handle;
|
task->session = (SClientUdfUvSession *) handle;
|
||||||
task->type = UDF_TASK_CALL;
|
task->type = UDF_TASK_CALL;
|
||||||
|
|
||||||
SUdfCallRequest *req = &task->_call.req;
|
SUdfCallRequest *req = &task->_call.req;
|
||||||
|
@ -1117,7 +1122,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
udfcRunUvTask(task, UV_TASK_REQ_RSP);
|
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||||
|
|
||||||
if (task->errCode != 0) {
|
if (task->errCode != 0) {
|
||||||
fnError("call udf failure. err: %d", task->errCode);
|
fnError("call udf failure. err: %d", task->errCode);
|
||||||
|
@ -1145,9 +1150,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
int err = task->errCode;
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
return task->errCode;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
|
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
|
||||||
|
@ -1188,28 +1194,36 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
|
||||||
convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
|
convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
|
||||||
SSDataBlock resultBlock = {0};
|
SSDataBlock resultBlock = {0};
|
||||||
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
|
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
|
||||||
convertDataBlockToScalarParm(&resultBlock, output);
|
if (err == 0) {
|
||||||
|
convertDataBlockToScalarParm(&resultBlock, output);
|
||||||
|
}
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t teardownUdf(UdfcFuncHandle handle) {
|
int32_t teardownUdf(UdfcFuncHandle handle) {
|
||||||
fnInfo("tear down udf. udf func handle: %p", handle);
|
fnInfo("tear down udf. udf func handle: %p", handle);
|
||||||
|
|
||||||
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
|
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
|
||||||
|
if (session->udfUvPipe == NULL) {
|
||||||
|
fnError("pipe to udfd does not exist");
|
||||||
|
return UDFC_CODE_NO_PIPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
|
||||||
task->errCode = 0;
|
task->errCode = 0;
|
||||||
task->session = (SUdfUvSession *) handle;
|
task->session = session;
|
||||||
task->type = UDF_TASK_TEARDOWN;
|
task->type = UDF_TASK_TEARDOWN;
|
||||||
|
|
||||||
SUdfTeardownRequest *req = &task->_teardown.req;
|
SUdfTeardownRequest *req = &task->_teardown.req;
|
||||||
req->udfHandle = task->session->severHandle;
|
req->udfHandle = task->session->severHandle;
|
||||||
|
|
||||||
udfcRunUvTask(task, UV_TASK_REQ_RSP);
|
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
|
||||||
|
|
||||||
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
|
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
|
||||||
|
|
||||||
int32_t err = task->errCode;
|
int32_t err = task->errCode;
|
||||||
|
|
||||||
udfcRunUvTask(task, UV_TASK_DISCONNECT);
|
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
|
||||||
|
|
||||||
taosMemoryFree(task->session);
|
taosMemoryFree(task->session);
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
|
@ -1219,7 +1233,7 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
|
||||||
|
|
||||||
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
|
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
|
||||||
typedef struct SUdfAggRes {
|
typedef struct SUdfAggRes {
|
||||||
SUdfUvSession *session;
|
SClientUdfUvSession *session;
|
||||||
int8_t finalResNum;
|
int8_t finalResNum;
|
||||||
int8_t interResNum;
|
int8_t interResNum;
|
||||||
char* finalResBuf;
|
char* finalResBuf;
|
||||||
|
@ -1242,7 +1256,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
|
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
SUdfUvSession *session = (SUdfUvSession *)handle;
|
SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
|
||||||
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
||||||
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
|
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
|
||||||
memset(udfRes, 0, envSize);
|
memset(udfRes, 0, envSize);
|
||||||
|
@ -1250,7 +1264,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
udfRes->session = (SUdfUvSession *)handle;
|
udfRes->session = (SClientUdfUvSession *)handle;
|
||||||
SUdfInterBuf buf = {0};
|
SUdfInterBuf buf = {0};
|
||||||
if (callUdfAggInit(handle, &buf) != 0) {
|
if (callUdfAggInit(handle, &buf) != 0) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -1265,7 +1279,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
int32_t numOfCols = pInput->numOfInputCols;
|
int32_t numOfCols = pInput->numOfInputCols;
|
||||||
|
|
||||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
SUdfUvSession *session = udfRes->session;
|
SClientUdfUvSession *session = udfRes->session;
|
||||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
|
@ -1315,7 +1329,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
|
|
||||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
SUdfUvSession *session = udfRes->session;
|
SClientUdfUvSession *session = udfRes->session;
|
||||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
|
|
|
@ -26,11 +26,18 @@ int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
|
||||||
|
|
||||||
SUdfColumnData *resultData = &resultCol->colData;
|
SUdfColumnData *resultData = &resultCol->colData;
|
||||||
resultData->numOfRows = block->numOfRows;
|
resultData->numOfRows = block->numOfRows;
|
||||||
SUdfColumnData *srcData = &block->udfCols[0]->colData;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
|
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
|
||||||
int32_t luckyNum = 88;
|
int j = 0;
|
||||||
udfColSetRow(resultCol, i, (char*)&luckyNum, false);
|
for (; j < block->numOfCols; ++j) {
|
||||||
|
if (udfColDataIsNull(block->udfCols[j], i)) {
|
||||||
|
udfColDataSetNull(resultCol, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ( j == block->numOfCols) {
|
||||||
|
int32_t luckyNum = 88;
|
||||||
|
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -26,24 +26,34 @@ int32_t udf2_start(SUdfInterBuf *buf) {
|
||||||
|
|
||||||
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||||
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
||||||
|
int8_t numOutput = 0;
|
||||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||||
for (int32_t j = 0; j < block->numOfRows; ++j) {
|
for (int32_t j = 0; j < block->numOfRows; ++j) {
|
||||||
SUdfColumn* col = block->udfCols[i];
|
SUdfColumn* col = block->udfCols[i];
|
||||||
//TODO: check the bitmap for null value
|
if (udfColDataIsNull(col, j)) {
|
||||||
int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
|
continue;
|
||||||
sumSquares += rows[j] * rows[j];
|
}
|
||||||
|
|
||||||
|
char* cell = udfColDataGetData(col, j);
|
||||||
|
int32_t num = *(int32_t*)cell;
|
||||||
|
sumSquares += num * num;
|
||||||
|
numOutput = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*(int64_t*)(newInterBuf->buf) = sumSquares;
|
if (numOutput == 1) {
|
||||||
newInterBuf->bufLen = sizeof(int64_t);
|
*(int64_t*)(newInterBuf->buf) = sumSquares;
|
||||||
//TODO: if all null value, numOfResult = 0;
|
newInterBuf->bufLen = sizeof(int64_t);
|
||||||
newInterBuf->numOfResult = 1;
|
}
|
||||||
|
newInterBuf->numOfResult = numOutput;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
|
int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
|
||||||
//TODO: check numOfResults;
|
if (buf->numOfResult == 0) {
|
||||||
|
resultData->numOfResult = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int64_t sumSquares = *(int64_t*)(buf->buf);
|
int64_t sumSquares = *(int64_t*)(buf->buf);
|
||||||
*(double*)(resultData->buf) = sqrt(sumSquares);
|
*(double*)(resultData->buf) = sqrt(sumSquares);
|
||||||
resultData->bufLen = sizeof(double);
|
resultData->bufLen = sizeof(double);
|
||||||
|
|
|
@ -66,4 +66,4 @@ endi
|
||||||
|
|
||||||
#sql drop function udf1;
|
#sql drop function udf1;
|
||||||
#sql drop function udf2;
|
#sql drop function udf2;
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
system sh/exec.sh -n dnode1 -s stop -x SIGTERM
|
||||||
|
|
Loading…
Reference in New Issue