From 7021d3f6403860873d4a3715d39f192247b9d5c4 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 15 Jan 2020 09:49:36 +0800 Subject: [PATCH] refactor codes --- src/client/src/tscJoinProcess.c | 14 +++++---- src/system/detail/inc/vnode.h | 9 ++++++ src/system/detail/src/vnodeQueryImpl.c | 40 +++++++++++++++++++++++--- 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 2daa640c13..d1e74f6599 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -315,9 +315,9 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); /* - * if the first column of the secondary query is not ts function, add this function. + * if the first column of the secondary query is not ts function, add this function. * Because this column is required to filter with timestamp after intersecting. - */ + */ if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); } @@ -349,8 +349,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { tscPrintSelectClause(pNew, 0); - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); @@ -391,7 +390,10 @@ static void doQuitSubquery(SSqlObj* pParentSql) { } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { - if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + int32_t numOfTotal = pSupporter->pState->numOfCompleted; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { pSqlObj->res.code = abs(pSupporter->pState->code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); @@ -897,7 +899,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { pTSBuf->f = fopen(pTSBuf->path, "r+"); if (pTSBuf->f == NULL) { - free(pTSBuf); + free(pTSBuf); return NULL; } diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 435184463b..75c0e8cd61 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -239,10 +239,19 @@ typedef struct SQuery { int lfd; // only for query in file, last file handle SCompBlock *pBlock; // only for query in file SField ** pFields; + int numOfBlocks; // only for query in file int blockBufferSize; // length of pBlock buffer int currentSlot; int firstSlot; + + /* + * the two parameters are utilized to handle the data missing situation, caused by import operation. + * When the commit slot is the first slot, and commitPoints != 0 + */ + int32_t commitSlot; // which slot is committed, + int32_t commitPoint; // starting point for next commit + int slot; int pos; TSKEY key; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 3dc520ad9d..ca9bbfae10 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1146,6 +1146,32 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE // keep the structure as well as the block data into local buffer memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock)); + // the commit data points will be ignored + int32_t offset = 0; + int32_t numOfPoints = pBlock->numOfPoints; + if (pQuery->firstSlot == pQuery->commitSlot) { + assert(pQuery->commitPoint >= 0 && pQuery->commitPoint <= pBlock->numOfPoints); + + offset = pQuery->commitPoint; + numOfPoints = pBlock->numOfPoints - offset; + + if (offset != 0) { + dTrace("%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. " + "first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, + pQuery->firstSlot, pQuery->currentSlot); + } + + pBlock->numOfPoints = numOfPoints; + + // current block are all commit already, ignore it + if (pBlock->numOfPoints == 0) { + dTrace("%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d" + "first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, + pQuery->firstSlot, pQuery->currentSlot); + return NULL; + } + } + // keep the data from in cache into the temporarily allocated buffer for(int32_t i = 0; i < pQuery->numOfCols; ++i) { SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i]; @@ -1164,9 +1190,9 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes && type == pColumnInfoEx->data.type); - memcpy(dst, pBlock->offset[columnIndex], pBlock->numOfPoints * bytes); + memcpy(dst, pBlock->offset[columnIndex] + offset * bytes, numOfPoints * bytes); } else { - setNullN(dst, type, bytes, pBlock->numOfPoints); + setNullN(dst, type, bytes, numOfPoints); } } @@ -2500,18 +2526,24 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v // commitSlot here denotes the first uncommitted block in cache int32_t numOfBlocks = 0; int32_t lastSlot = 0; + int32_t commitSlot = 0; + int32_t commitPoint = 0; SCachePool *pPool = (SCachePool *)vnodeList[vid].pCachePool; pthread_mutex_lock(&pPool->vmutex); numOfBlocks = pCacheInfo->numOfBlocks; lastSlot = pCacheInfo->currentSlot; + commitSlot = pCacheInfo->commitSlot; + commitPoint = pCacheInfo->commitPoint; pthread_mutex_unlock(&pPool->vmutex); // make sure it is there, otherwise, return right away pQuery->currentSlot = lastSlot; pQuery->numOfBlocks = numOfBlocks; pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo); - + pQuery->commitSlot = commitSlot; + pQuery->commitPoint = commitPoint; + /* * Note: the block id is continuous increasing, never becomes smaller. * @@ -4437,7 +4469,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl pSummary->fileTimeUs += (taosGetTimestampUs() - start); } else { - assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true)); + assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);