Merge pull request #17524 from taosdata/szhou/fixbugs
fix: add udf call and uv synchronization with uv_mutex
This commit is contained in:
commit
67a3c69b5e
|
@ -339,10 +339,11 @@ typedef struct SUdfcProxy {
|
|||
uv_mutex_t udfStubsMutex;
|
||||
SArray *udfStubs; // SUdfcFuncStub
|
||||
|
||||
uv_mutex_t udfcUvMutex;
|
||||
int8_t initialized;
|
||||
} SUdfcProxy;
|
||||
|
||||
SUdfcProxy gUdfdProxy = {0};
|
||||
SUdfcProxy gUdfcProxy = {0};
|
||||
|
||||
typedef struct SUdfcUvSession {
|
||||
SUdfcProxy *udfc;
|
||||
|
@ -896,23 +897,23 @@ int compareUdfcFuncSub(const void *elem1, const void *elem2) {
|
|||
|
||||
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
||||
int32_t code = 0;
|
||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
|
||||
SUdfcFuncStub key = {0};
|
||||
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||
int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||
if (stubIndex != -1) {
|
||||
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
|
||||
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
|
||||
UdfcFuncHandle handle = foundStub->handle;
|
||||
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
|
||||
*pHandle = foundStub->handle;
|
||||
++foundStub->refCount;
|
||||
foundStub->lastRefTime = taosGetTimestampUs();
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||
return 0;
|
||||
} else {
|
||||
fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName,
|
||||
foundStub->refCount, foundStub->lastRefTime);
|
||||
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
|
||||
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
|
||||
}
|
||||
}
|
||||
*pHandle = NULL;
|
||||
|
@ -923,46 +924,46 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
|||
stub.handle = *pHandle;
|
||||
++stub.refCount;
|
||||
stub.lastRefTime = taosGetTimestampUs();
|
||||
taosArrayPush(gUdfdProxy.udfStubs, &stub);
|
||||
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
|
||||
taosArrayPush(gUdfcProxy.udfStubs, &stub);
|
||||
taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
|
||||
} else {
|
||||
*pHandle = NULL;
|
||||
}
|
||||
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
void releaseUdfFuncHandle(char *udfName) {
|
||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
|
||||
SUdfcFuncStub key = {0};
|
||||
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||
if (!foundStub) {
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||
return;
|
||||
}
|
||||
if (foundStub->refCount > 0) {
|
||||
--foundStub->refCount;
|
||||
}
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||
}
|
||||
|
||||
int32_t cleanUpUdfs() {
|
||||
int8_t initialized = atomic_load_8(&gUdfdProxy.initialized);
|
||||
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
|
||||
if (!initialized) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||
if (gUdfdProxy.udfStubs == NULL || taosArrayGetSize(gUdfdProxy.udfStubs) == 0) {
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
|
||||
if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) {
|
||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||
int32_t i = 0;
|
||||
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
||||
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
||||
while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
|
||||
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
|
||||
if (stub->refCount == 0) {
|
||||
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
|
||||
doTeardownUdf(stub->handle);
|
||||
|
@ -979,9 +980,9 @@ int32_t cleanUpUdfs() {
|
|||
}
|
||||
++i;
|
||||
}
|
||||
taosArrayDestroy(gUdfdProxy.udfStubs);
|
||||
gUdfdProxy.udfStubs = udfStubs;
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
taosArrayDestroy(gUdfcProxy.udfStubs);
|
||||
gUdfcProxy.udfStubs = udfStubs;
|
||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1157,9 +1158,11 @@ void onUdfcPipeClose(uv_handle_t *handle) {
|
|||
QUEUE_REMOVE(&task->procTaskQueue);
|
||||
uv_sem_post(&task->taskSem);
|
||||
}
|
||||
uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
|
||||
if (conn->session != NULL) {
|
||||
conn->session->udfUvPipe = NULL;
|
||||
}
|
||||
uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
|
||||
taosMemoryFree(conn->readBuf.buf);
|
||||
taosMemoryFree(conn);
|
||||
taosMemoryFree((uv_pipe_t *)handle);
|
||||
|
@ -1553,11 +1556,11 @@ void constructUdfService(void *argsThread) {
|
|||
}
|
||||
|
||||
int32_t udfcOpen() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
|
||||
int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
|
||||
if (old == 1) {
|
||||
return 0;
|
||||
}
|
||||
SUdfcProxy *proxy = &gUdfdProxy;
|
||||
SUdfcProxy *proxy = &gUdfcProxy;
|
||||
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
|
||||
proxy->udfcState = UDFC_STATE_STARTNG;
|
||||
uv_barrier_init(&proxy->initBarrier, 2);
|
||||
|
@ -1567,16 +1570,17 @@ int32_t udfcOpen() {
|
|||
uv_barrier_wait(&proxy->initBarrier);
|
||||
uv_mutex_init(&proxy->udfStubsMutex);
|
||||
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
|
||||
uv_mutex_init(&proxy->udfcUvMutex);
|
||||
fnInfo("udfc initialized") return 0;
|
||||
}
|
||||
|
||||
int32_t udfcClose() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
|
||||
int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
|
||||
if (old == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SUdfcProxy *udfc = &gUdfdProxy;
|
||||
SUdfcProxy *udfc = &gUdfcProxy;
|
||||
udfc->udfcState = UDFC_STATE_STOPPING;
|
||||
uv_async_send(&udfc->loopStopAsync);
|
||||
uv_thread_join(&udfc->loopThread);
|
||||
|
@ -1584,6 +1588,7 @@ int32_t udfcClose() {
|
|||
uv_barrier_destroy(&udfc->initBarrier);
|
||||
taosArrayDestroy(udfc->udfStubs);
|
||||
uv_mutex_destroy(&udfc->udfStubsMutex);
|
||||
uv_mutex_destroy(&udfc->udfcUvMutex);
|
||||
udfc->udfcState = UDFC_STATE_INITAL;
|
||||
fnInfo("udfc is cleaned up");
|
||||
return 0;
|
||||
|
@ -1611,13 +1616,13 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
|||
}
|
||||
|
||||
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
|
||||
if (gUdfcProxy.udfcState != UDFC_STATE_READY) {
|
||||
return TSDB_CODE_UDF_INVALID_STATE;
|
||||
}
|
||||
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
|
||||
task->errCode = 0;
|
||||
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
|
||||
task->session->udfc = &gUdfdProxy;
|
||||
task->session->udfc = &gUdfcProxy;
|
||||
task->type = UDF_TASK_SETUP;
|
||||
|
||||
SUdfSetupRequest *req = &task->_setup.req;
|
||||
|
@ -1625,7 +1630,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
|||
|
||||
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
|
||||
if (errCode != 0) {
|
||||
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
|
||||
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfcProxy)->udfdPipeName);
|
||||
taosMemoryFree(task->session);
|
||||
taosMemoryFree(task);
|
||||
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
|
||||
|
@ -1799,10 +1804,12 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
|||
|
||||
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
||||
// TODO: synchronization refactor between libuv event loop and request thread
|
||||
uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
|
||||
if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
|
||||
SClientUvConn *conn = session->udfUvPipe->data;
|
||||
conn->session = NULL;
|
||||
}
|
||||
uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
|
||||
taosMemoryFree(session);
|
||||
taosMemoryFree(task);
|
||||
|
||||
|
|
Loading…
Reference in New Issue