enhance: add udf stub expiration of 10s after creation
This commit is contained in:
parent
218f8d30a1
commit
33143e90c8
|
@ -343,7 +343,7 @@ typedef struct SUdfcFuncStub {
|
||||||
char udfName[TSDB_FUNC_NAME_LEN + 1];
|
char udfName[TSDB_FUNC_NAME_LEN + 1];
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int64_t lastRefTime;
|
int64_t createTime;
|
||||||
} SUdfcFuncStub;
|
} SUdfcFuncStub;
|
||||||
|
|
||||||
typedef struct SUdfcProxy {
|
typedef struct SUdfcProxy {
|
||||||
|
@ -982,15 +982,15 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
||||||
if (stubIndex != -1) {
|
if (stubIndex != -1) {
|
||||||
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
|
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
|
||||||
UdfcFuncHandle handle = foundStub->handle;
|
UdfcFuncHandle handle = foundStub->handle;
|
||||||
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
|
int64_t currUs = taosGetTimestampUs();
|
||||||
|
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL && (currUs - foundStub->createTime) < 10 * 1000 * 1000) {
|
||||||
*pHandle = foundStub->handle;
|
*pHandle = foundStub->handle;
|
||||||
++foundStub->refCount;
|
++foundStub->refCount;
|
||||||
foundStub->lastRefTime = taosGetTimestampUs();
|
|
||||||
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName,
|
fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
|
||||||
foundStub->refCount, foundStub->lastRefTime);
|
foundStub->refCount, foundStub->createTime);
|
||||||
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
|
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1001,7 +1001,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
||||||
strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||||
stub.handle = *pHandle;
|
stub.handle = *pHandle;
|
||||||
++stub.refCount;
|
++stub.refCount;
|
||||||
stub.lastRefTime = taosGetTimestampUs();
|
stub.createTime = taosGetTimestampUs();
|
||||||
taosArrayPush(gUdfcProxy.udfStubs, &stub);
|
taosArrayPush(gUdfcProxy.udfStubs, &stub);
|
||||||
taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
|
taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1046,14 +1046,14 @@ int32_t cleanUpUdfs() {
|
||||||
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
|
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
|
||||||
doTeardownUdf(stub->handle);
|
doTeardownUdf(stub->handle);
|
||||||
} else {
|
} else {
|
||||||
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName,
|
fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
|
||||||
stub->refCount, stub->lastRefTime, stub->handle);
|
stub->refCount, stub->createTime, stub->handle);
|
||||||
UdfcFuncHandle handle = stub->handle;
|
UdfcFuncHandle handle = stub->handle;
|
||||||
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
|
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
|
||||||
taosArrayPush(udfStubs, stub);
|
taosArrayPush(udfStubs, stub);
|
||||||
} else {
|
} else {
|
||||||
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache",
|
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache",
|
||||||
stub->udfName, stub->refCount, stub->lastRefTime);
|
stub->udfName, stub->refCount, stub->createTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
++i;
|
++i;
|
||||||
|
|
Loading…
Reference in New Issue