fix: add expired udfc func stub to track the expired
This commit is contained in:
parent
33143e90c8
commit
fe718f60ee
|
@ -363,6 +363,7 @@ typedef struct SUdfcProxy {
|
||||||
|
|
||||||
uv_mutex_t udfStubsMutex;
|
uv_mutex_t udfStubsMutex;
|
||||||
SArray *udfStubs; // SUdfcFuncStub
|
SArray *udfStubs; // SUdfcFuncStub
|
||||||
|
SArray *expiredUdfStubs; //SUdfcFuncStub
|
||||||
|
|
||||||
uv_mutex_t udfcUvMutex;
|
uv_mutex_t udfcUvMutex;
|
||||||
int8_t initialized;
|
int8_t initialized;
|
||||||
|
@ -983,7 +984,9 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
||||||
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
|
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
|
||||||
UdfcFuncHandle handle = foundStub->handle;
|
UdfcFuncHandle handle = foundStub->handle;
|
||||||
int64_t currUs = taosGetTimestampUs();
|
int64_t currUs = taosGetTimestampUs();
|
||||||
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL && (currUs - foundStub->createTime) < 10 * 1000 * 1000) {
|
bool expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
|
||||||
|
if (!expired) {
|
||||||
|
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
|
||||||
*pHandle = foundStub->handle;
|
*pHandle = foundStub->handle;
|
||||||
++foundStub->refCount;
|
++foundStub->refCount;
|
||||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||||
|
@ -993,6 +996,11 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
||||||
foundStub->refCount, foundStub->createTime);
|
foundStub->refCount, foundStub->createTime);
|
||||||
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
|
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
|
||||||
|
taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub);
|
||||||
|
taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
*pHandle = NULL;
|
*pHandle = NULL;
|
||||||
code = doSetupUdf(udfName, pHandle);
|
code = doSetupUdf(udfName, pHandle);
|
||||||
|
@ -1017,13 +1025,17 @@ void releaseUdfFuncHandle(char *udfName) {
|
||||||
SUdfcFuncStub key = {0};
|
SUdfcFuncStub key = {0};
|
||||||
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||||
if (!foundStub) {
|
SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||||
|
if (!foundStub && !expiredStub) {
|
||||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (foundStub->refCount > 0) {
|
if (foundStub != NULL && foundStub->refCount > 0) {
|
||||||
--foundStub->refCount;
|
--foundStub->refCount;
|
||||||
}
|
}
|
||||||
|
if (expiredStub != NULL && expiredStub->refCount > 0) {
|
||||||
|
--expiredStub->refCount;
|
||||||
|
}
|
||||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1060,6 +1072,28 @@ int32_t cleanUpUdfs() {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(gUdfcProxy.udfStubs);
|
taosArrayDestroy(gUdfcProxy.udfStubs);
|
||||||
gUdfcProxy.udfStubs = udfStubs;
|
gUdfcProxy.udfStubs = udfStubs;
|
||||||
|
|
||||||
|
SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||||
|
while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
|
||||||
|
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
|
||||||
|
if (stub->refCount == 0) {
|
||||||
|
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
|
||||||
|
doTeardownUdf(stub->handle);
|
||||||
|
} else {
|
||||||
|
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
|
||||||
|
stub->refCount, stub->createTime, stub->handle);
|
||||||
|
UdfcFuncHandle handle = stub->handle;
|
||||||
|
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
|
||||||
|
taosArrayPush(expiredUdfStubs, stub);
|
||||||
|
} else {
|
||||||
|
fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
|
||||||
|
stub->udfName, stub->refCount, stub->createTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(gUdfcProxy.udfStubs);
|
||||||
|
gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
|
||||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1663,6 +1697,7 @@ int32_t udfcOpen() {
|
||||||
uv_barrier_wait(&proxy->initBarrier);
|
uv_barrier_wait(&proxy->initBarrier);
|
||||||
uv_mutex_init(&proxy->udfStubsMutex);
|
uv_mutex_init(&proxy->udfStubsMutex);
|
||||||
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
|
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
|
||||||
|
proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
|
||||||
uv_mutex_init(&proxy->udfcUvMutex);
|
uv_mutex_init(&proxy->udfcUvMutex);
|
||||||
fnInfo("udfc initialized") return 0;
|
fnInfo("udfc initialized") return 0;
|
||||||
}
|
}
|
||||||
|
@ -1679,6 +1714,7 @@ int32_t udfcClose() {
|
||||||
uv_thread_join(&udfc->loopThread);
|
uv_thread_join(&udfc->loopThread);
|
||||||
uv_mutex_destroy(&udfc->taskQueueMutex);
|
uv_mutex_destroy(&udfc->taskQueueMutex);
|
||||||
uv_barrier_destroy(&udfc->initBarrier);
|
uv_barrier_destroy(&udfc->initBarrier);
|
||||||
|
taosArrayDestroy(udfc->expiredUdfStubs);
|
||||||
taosArrayDestroy(udfc->udfStubs);
|
taosArrayDestroy(udfc->udfStubs);
|
||||||
uv_mutex_destroy(&udfc->udfStubsMutex);
|
uv_mutex_destroy(&udfc->udfStubsMutex);
|
||||||
uv_mutex_destroy(&udfc->udfcUvMutex);
|
uv_mutex_destroy(&udfc->udfcUvMutex);
|
||||||
|
|
Loading…
Reference in New Issue