refactor: add an array list api.

This commit is contained in:
Haojun Liao 2023-01-17 09:43:33 +08:00
parent e5ee9689aa
commit 07cf336fa0
8 changed files with 41 additions and 20 deletions

View File

@ -53,6 +53,7 @@ typedef struct SArray {
* @return * @return
*/ */
SArray* taosArrayInit(size_t size, size_t elemSize); SArray* taosArrayInit(size_t size, size_t elemSize);
SArray* taosArrayInit_s(size_t size, size_t elemSize, size_t initialSize);
/** /**
* *

View File

@ -2353,6 +2353,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
if (pBlock->pDataBlock == NULL) { if (pBlock->pDataBlock == NULL) {
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArraySetSize(pBlock->pDataBlock, numOfCols); taosArraySetSize(pBlock->pDataBlock, numOfCols);
} }

View File

@ -300,7 +300,7 @@ typedef struct SCtgSubRes {
ctgSubTaskCbFp fp; ctgSubTaskCbFp fp;
} SCtgSubRes; } SCtgSubRes;
typedef struct SCtgTask { struct SCtgTask {
CTG_TASK_TYPE type; CTG_TASK_TYPE type;
int32_t taskId; int32_t taskId;
SCtgJob* pJob; SCtgJob* pJob;
@ -313,7 +313,7 @@ typedef struct SCtgTask {
SRWLatch lock; SRWLatch lock;
SArray* pParents; SArray* pParents;
SCtgSubRes subRes; SCtgSubRes subRes;
} SCtgTask; };
typedef struct SCtgTaskReq { typedef struct SCtgTaskReq {
SCtgTask* pTask; SCtgTask* pTask;

View File

@ -1705,9 +1705,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); pTask->msgCtxs = taosArrayInit_s(pCtx->fetchNum, sizeof(SCtgMsgCtx), pCtx->fetchNum);
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
@ -1842,7 +1840,10 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0); ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0);
baseResIdx += taosArrayGetSize(pReq->pTables); baseResIdx += taosArrayGetSize(pReq->pTables);
taosArraySetSize(pCtx->pResList, baseResIdx); int32_t inc = baseResIdx - taosArrayGetSize(pCtx->pResList);
for(int32_t j = 0; j < inc; ++j) {
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
}
} }
} }
@ -1854,8 +1855,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); pTask->msgCtxs = taosArrayInit_s(pCtx->fetchNum, sizeof(SCtgMsgCtx), pCtx->fetchNum);
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);

View File

@ -2480,20 +2480,20 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug("db %s not in cache", dbFName); ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaData){0});
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
SName *pName = taosArrayGet(pList, i); pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) { if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName); ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaRes){0});
continue; continue;
} }
@ -2503,7 +2503,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_UNLOCK(CTG_READ, &pCache->metaLock); CTG_UNLOCK(CTG_READ, &pCache->metaLock);
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName); ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaRes){0});
continue; continue;
} }
@ -2576,7 +2576,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
if (NULL == stName) { if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName); ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
continue; continue;
@ -2588,7 +2588,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
taosHashRelease(dbCache->stbCache, stName); taosHashRelease(dbCache->stbCache, stName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
continue; continue;
@ -2603,7 +2603,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
taosHashRelease(dbCache->tbCache, pCache); taosHashRelease(dbCache->tbCache, pCache);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
@ -2619,7 +2619,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
nctx.tbInfo.suid); nctx.tbInfo.suid);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); taosArrayPush(ctx->pResList, &(SMetaRes){0});
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);

View File

@ -812,7 +812,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArraySetSize(block->pDataBlock, 1); taosArrayPush(block->pDataBlock, &(SColumnInfoData){0});
SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0); SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
SUdfColumnMeta *meta = &udfCol->colMeta; SUdfColumnMeta *meta = &udfCol->colMeta;
col->info.precision = meta->precision; col->info.precision = meta->precision;

View File

@ -17,11 +17,10 @@
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum; int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock)); SArray* pArray = taosArrayInit_s(blockNum, sizeof(SSDataBlock), blockNum);
if (pArray == NULL) { if (pArray == NULL) {
return -1; return -1;
} }
taosArraySetSize(pArray, blockNum);
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
@ -49,7 +48,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
if (pArray == NULL) { if (pArray == NULL) {
return -1; return -1;
} }
taosArraySetSize(pArray, 1); taosArrayPush(pArray, &(SSDataBlock){0});
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0); SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, pRetrieve->data); blockDecode(pDataBlock, pRetrieve->data);

View File

@ -45,6 +45,26 @@ SArray* taosArrayInit(size_t size, size_t elemSize) {
return pArray; return pArray;
} }
SArray* taosArrayInit_s(size_t size, size_t elemSize, size_t initialSize) {
SArray* pArray = taosMemoryMalloc(sizeof(SArray));
if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pArray->size = 0;
pArray->pData = taosMemoryCalloc(initialSize, elemSize);
if (pArray->pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pArray);
return NULL;
}
pArray->capacity = initialSize;
pArray->elemSize = elemSize;
return pArray;
}
static int32_t taosArrayResize(SArray* pArray) { static int32_t taosArrayResize(SArray* pArray) {
assert(pArray->size >= pArray->capacity); assert(pArray->size >= pArray->capacity);