fix: job destroy issue

This commit is contained in:
dapan1121 2024-11-21 18:02:42 +08:00
parent 2738d5dd9c
commit 205d47d981
2 changed files with 39 additions and 5 deletions

View File

@ -78,12 +78,13 @@ int32_t qwInitJobHash(void) {
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
QW_SET_TEID(id, tId, eId);
int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1);
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id));
taosMemPoolDestroySession(gMemPoolHandle, session);
int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1);
if (0 == remainSessions) {
QW_LOCK(QW_WRITE, &pJobInfo->lock);
if (0 == taosHashGetSize(pJobInfo->pSessions) && 0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) {

View File

@ -735,7 +735,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
int64_t retiredSize = 0;
while (retiredSize < retireSize && NULL != pJob) {
if (atomic_load_8(&pJob->retired) || 0 == atomic_load_8(&pJob->initDone)) {
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob);
continue;
}
@ -753,7 +753,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
jobId, aSize, retireSize, retiredSize);
}
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob);
}
taosHashCancelIterate(mptCtx.pJobs, pJob);
@ -1492,8 +1492,8 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
#if 1
#if 0
TEST(FuncTest, SysMemoryPerfTest) {
char* caseName = "FuncTest:SingleThreadTest";
TEST(PerfTest, GetSysAvail) {
char* caseName = "PerfTest:GetSysAvail";
int32_t code = 0;
int64_t msize = 1048576UL*10240;
@ -1518,6 +1518,39 @@ TEST(FuncTest, SysMemoryPerfTest) {
}
#endif
#if 0
TEST(PerfTest, allocLatency) {
char* caseName = "PerfTest:allocLatency";
int32_t code = 0;
int64_t msize = 10;
char* p = NULL;
void* pSession = NULL;
void* pJob = NULL;
mptInitPool();
memset(mptCtx.jobCtxs, 0, sizeof(*mptCtx.jobCtxs));
assert(0 == taosMemPoolCallocJob(0, 0, (void**)&pJob));
assert(0 == taosMemPoolInitSession(gMemPoolHandle, &pSession, pJob, "id"));
int32_t loopTimes = 1000000;
int64_t st = taosGetTimestampUs();
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
p = mptMemoryMalloc(msize);
assert(0 == code);
}
int64_t totalUs = taosGetTimestampUs() - st;
mptDisableMemoryPoolUsage();
printf("%d times alloc %" PRId64 " bytes, total time:%" PRId64 "us, avg:%dus\n", loopTimes, totalUs, totalUs/loopTimes);
}
#endif
#if 0
TEST(FuncTest, SingleThreadTest) {
char* caseName = "FuncTest:SingleThreadTest";