refactor(log): add some logs.

This commit is contained in:
Haojun Liao 2023-04-29 00:00:25 +08:00
parent c647ded0da
commit b69137df44
2 changed files with 6 additions and 5 deletions

View File

@ -172,7 +172,7 @@ void tqNotifyClose(STQ* pTq) {
int64_t st = taosGetTimestampMs();
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);

View File

@ -52,7 +52,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
}
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) {
goto _err;
}
@ -215,6 +215,8 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
@ -222,11 +224,10 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
}
taosWUnLockLatch(&pMeta->lock);
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {