Merge pull request #24948 from taosdata/fix/nullcheck
fix(util): fix the new size remaining bug.
This commit is contained in:
commit
4fb9595744
|
@ -72,40 +72,6 @@ struct STaosQnode {
|
||||||
char item[];
|
char item[];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STaosQueue {
|
|
||||||
STaosQnode *head;
|
|
||||||
STaosQnode *tail;
|
|
||||||
STaosQueue *next; // for queue set
|
|
||||||
STaosQset *qset; // for queue set
|
|
||||||
void *ahandle; // for queue set
|
|
||||||
FItem itemFp;
|
|
||||||
FItems itemsFp;
|
|
||||||
TdThreadMutex mutex;
|
|
||||||
int64_t memOfItems;
|
|
||||||
int32_t numOfItems;
|
|
||||||
int64_t threadId;
|
|
||||||
int64_t memLimit;
|
|
||||||
int64_t itemLimit;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct STaosQset {
|
|
||||||
STaosQueue *head;
|
|
||||||
STaosQueue *current;
|
|
||||||
TdThreadMutex mutex;
|
|
||||||
tsem_t sem;
|
|
||||||
int32_t numOfQueues;
|
|
||||||
int32_t numOfItems;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct STaosQall {
|
|
||||||
STaosQnode *current;
|
|
||||||
STaosQnode *start;
|
|
||||||
int32_t numOfItems;
|
|
||||||
int64_t memOfItems;
|
|
||||||
int32_t unAccessedNumOfItems;
|
|
||||||
int64_t unAccessMemOfItems;
|
|
||||||
};
|
|
||||||
|
|
||||||
STaosQueue *taosOpenQueue();
|
STaosQueue *taosOpenQueue();
|
||||||
void taosCloseQueue(STaosQueue *queue);
|
void taosCloseQueue(STaosQueue *queue);
|
||||||
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
||||||
|
@ -140,6 +106,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
|
||||||
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo);
|
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo);
|
||||||
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
|
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
|
||||||
void taosResetQsetThread(STaosQset *qset, void *pItem);
|
void taosResetQsetThread(STaosQset *qset, void *pItem);
|
||||||
|
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId);
|
||||||
|
int64_t taosQueueGetThreadId(STaosQueue *pQueue);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -876,12 +876,13 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||||
STaosQall* qall = taosAllocateQall();
|
STaosQall* qall = taosAllocateQall();
|
||||||
taosReadAllQitems(pTmq->delayedTask, qall);
|
taosReadAllQitems(pTmq->delayedTask, qall);
|
||||||
|
|
||||||
if (qall->numOfItems == 0) {
|
int32_t numOfItems = taosQallItemSize(qall);
|
||||||
|
if (numOfItems == 0) {
|
||||||
taosFreeQall(qall);
|
taosFreeQall(qall);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
|
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
|
||||||
int8_t* pTaskType = NULL;
|
int8_t* pTaskType = NULL;
|
||||||
taosGetQitem(qall, (void**)&pTaskType);
|
taosGetQitem(qall, (void**)&pTaskType);
|
||||||
|
|
||||||
|
@ -1839,7 +1840,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
|
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqRspWrapper* pRspWrapper = NULL;
|
SMqRspWrapper* pRspWrapper = NULL;
|
||||||
|
|
|
@ -194,26 +194,26 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
while (pVnode->refCount > 0) taosMsleep(10);
|
while (pVnode->refCount > 0) taosMsleep(10);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
|
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
|
||||||
pVnode->pWriteW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pWriteW.queue));
|
||||||
tMultiWorkerCleanup(&pVnode->pWriteW);
|
tMultiWorkerCleanup(&pVnode->pWriteW);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
|
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
|
||||||
pVnode->pSyncW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pSyncW.queue));
|
||||||
tMultiWorkerCleanup(&pVnode->pSyncW);
|
tMultiWorkerCleanup(&pVnode->pSyncW);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
|
dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
|
||||||
pVnode->pSyncRdW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pSyncRdW.queue));
|
||||||
tMultiWorkerCleanup(&pVnode->pSyncRdW);
|
tMultiWorkerCleanup(&pVnode->pSyncRdW);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
|
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
|
||||||
pVnode->pApplyW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pApplyW.queue));
|
||||||
tMultiWorkerCleanup(&pVnode->pApplyW);
|
tMultiWorkerCleanup(&pVnode->pApplyW);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
|
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
|
||||||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
||||||
pVnode->pFetchQ->threadId);
|
taosQueueGetThreadId(pVnode->pFetchQ));
|
||||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||||
|
|
||||||
tqNotifyClose(pVnode->pImpl->pTq);
|
tqNotifyClose(pVnode->pImpl->pTq);
|
||||||
|
|
|
@ -365,16 +365,16 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
|
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
|
||||||
pVnode->pWriteW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pWriteW.queue));
|
||||||
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
|
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
|
||||||
pVnode->pSyncW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pSyncW.queue));
|
||||||
dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
|
dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
|
||||||
pVnode->pSyncRdW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pSyncRdW.queue));
|
||||||
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
|
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
|
||||||
pVnode->pApplyW.queue->threadId);
|
taosQueueGetThreadId(pVnode->pApplyW.queue));
|
||||||
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
|
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
|
||||||
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
||||||
pVnode->pFetchQ->threadId);
|
taosQueueGetThreadId(pVnode->pFetchQ));
|
||||||
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,9 +92,11 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
|
||||||
if (newCap * pArray->elemSize > BOUNDARY_SIZE) {
|
if (newCap * pArray->elemSize > BOUNDARY_SIZE) {
|
||||||
factor = BOUNDARY_SMALL_FACTOR;
|
factor = BOUNDARY_SMALL_FACTOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t tsize = (pArray->capacity * factor);
|
size_t tsize = (pArray->capacity * factor);
|
||||||
while (newCap > tsize) {
|
while (newCap > tsize) {
|
||||||
tsize = (tsize * factor);
|
size_t newSize = (tsize * factor);
|
||||||
|
tsize = (newSize == tsize) ? (tsize + 2) : newSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);
|
pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);
|
||||||
|
|
|
@ -21,6 +21,40 @@
|
||||||
int64_t tsRpcQueueMemoryAllowed = 0;
|
int64_t tsRpcQueueMemoryAllowed = 0;
|
||||||
int64_t tsRpcQueueMemoryUsed = 0;
|
int64_t tsRpcQueueMemoryUsed = 0;
|
||||||
|
|
||||||
|
struct STaosQueue {
|
||||||
|
STaosQnode *head;
|
||||||
|
STaosQnode *tail;
|
||||||
|
STaosQueue *next; // for queue set
|
||||||
|
STaosQset *qset; // for queue set
|
||||||
|
void *ahandle; // for queue set
|
||||||
|
FItem itemFp;
|
||||||
|
FItems itemsFp;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
int64_t memOfItems;
|
||||||
|
int32_t numOfItems;
|
||||||
|
int64_t threadId;
|
||||||
|
int64_t memLimit;
|
||||||
|
int64_t itemLimit;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct STaosQset {
|
||||||
|
STaosQueue *head;
|
||||||
|
STaosQueue *current;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
tsem_t sem;
|
||||||
|
int32_t numOfQueues;
|
||||||
|
int32_t numOfItems;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct STaosQall {
|
||||||
|
STaosQnode *current;
|
||||||
|
STaosQnode *start;
|
||||||
|
int32_t numOfItems;
|
||||||
|
int64_t memOfItems;
|
||||||
|
int32_t unAccessedNumOfItems;
|
||||||
|
int64_t unAccessMemOfItems;
|
||||||
|
};
|
||||||
|
|
||||||
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
|
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
|
||||||
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
|
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
|
||||||
|
|
||||||
|
@ -497,6 +531,12 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI
|
||||||
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
|
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
|
||||||
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
|
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
|
||||||
|
|
||||||
|
void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) {
|
||||||
|
pQueue->threadId = threadId;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
|
||||||
void taosResetQsetThread(STaosQset *qset, void *pItem) {
|
void taosResetQsetThread(STaosQset *qset, void *pItem) {
|
||||||
|
|
|
@ -417,9 +417,9 @@ _OVER:
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
while (worker->pid <= 0) taosMsleep(10);
|
while (worker->pid <= 0) taosMsleep(10);
|
||||||
queue->threadId = worker->pid;
|
|
||||||
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
|
taosQueueSetThreadId(queue, worker->pid);
|
||||||
queue->threadId);
|
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue