Merge pull request #16033 from taosdata/fix/TD-18075
fix: fix taosd put blocks to sink error
This commit is contained in:
commit
46a6b37779
|
@ -916,7 +916,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
|
||||||
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
||||||
if (vgNum <= 0) {
|
if (vgNum <= 0) {
|
||||||
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
tableNameHashFp fp = NULL;
|
tableNameHashFp fp = NULL;
|
||||||
|
@ -931,6 +931,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
|
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
|
||||||
if (NULL == vgInfo) {
|
if (NULL == vgInfo) {
|
||||||
|
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -980,7 +981,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
|
||||||
|
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
|
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
|
||||||
ASSERT(0);
|
|
||||||
taosArrayDestroy(pVgList);
|
taosArrayDestroy(pVgList);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,12 +101,14 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
|
/*
|
||||||
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
|
||||||
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
|
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
|
||||||
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
|
||||||
taosQueueItemSize(pDispatcher->pDataBlocks));
|
taosQueueItemSize(pDispatcher->pDataBlocks));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
|
pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
|
||||||
|
|
||||||
|
|
|
@ -302,6 +302,7 @@ void *taosMemoryStrDup(const char *ptr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosMemoryFree(void *ptr) {
|
void taosMemoryFree(void *ptr) {
|
||||||
|
if (NULL == ptr) return;
|
||||||
#ifdef USE_TD_MEMORY
|
#ifdef USE_TD_MEMORY
|
||||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
||||||
if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
|
if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
|
||||||
|
|
Loading…
Reference in New Issue