refactor: fix typo and do some internal refactor.
This commit is contained in:
parent
6acdfdd9ab
commit
d06bcb2219
|
@ -133,13 +133,6 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
||||||
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal);
|
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal);
|
||||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
|
||||||
|
|
||||||
/**
|
|
||||||
* kill the ongoing query and free the query handle and corresponding resources automatically
|
|
||||||
* @param tinfo qhandle
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qKillTask(qTaskInfo_t tinfo);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* kill the ongoing query asynchronously
|
* kill the ongoing query asynchronously
|
||||||
* @param tinfo qhandle
|
* @param tinfo qhandle
|
||||||
|
|
|
@ -38,7 +38,6 @@ extern "C" {
|
||||||
#define TARRAY_MIN_SIZE 8
|
#define TARRAY_MIN_SIZE 8
|
||||||
#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
|
#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize))
|
||||||
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
|
#define TARRAY_ELEM_IDX(array, ele) (POINTER_DISTANCE(ele, (array)->pData) / (array)->elemSize)
|
||||||
#define TARRAY_GET_START(array) ((array)->pData)
|
|
||||||
|
|
||||||
typedef struct SArray {
|
typedef struct SArray {
|
||||||
size_t size;
|
size_t size;
|
||||||
|
@ -71,14 +70,6 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize);
|
||||||
*/
|
*/
|
||||||
void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles);
|
void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles);
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param pArray
|
|
||||||
* @param pData position array list
|
|
||||||
* @param numOfElems the number of removed position
|
|
||||||
*/
|
|
||||||
void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfElems);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param pArray
|
* @param pArray
|
||||||
|
@ -266,13 +257,6 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
|
||||||
*/
|
*/
|
||||||
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags);
|
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags);
|
||||||
|
|
||||||
/**
|
|
||||||
* search the array
|
|
||||||
* @param pArray
|
|
||||||
* @param key
|
|
||||||
*/
|
|
||||||
char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int32_t flags);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* sort the pointer data in the array
|
* sort the pointer data in the array
|
||||||
* @param pArray
|
* @param pArray
|
||||||
|
@ -286,8 +270,6 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
|
||||||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
||||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
||||||
|
|
||||||
char* taosShowStrArray(const SArray* pArray);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* swap array
|
* swap array
|
||||||
* @param a
|
* @param a
|
||||||
|
@ -295,6 +277,7 @@ char* taosShowStrArray(const SArray* pArray);
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void taosArraySwap(SArray* a, SArray* b);
|
void taosArraySwap(SArray* a, SArray* b);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1506,7 +1506,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow) {
|
if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
|
||||||
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
|
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
|
||||||
pCommitter->cmprAlg);
|
pCommitter->cmprAlg);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -152,8 +152,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
||||||
pInfo->loadBlocks += 1;
|
pInfo->loadBlocks += 1;
|
||||||
|
|
||||||
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
|
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
|
||||||
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
|
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
|
||||||
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
|
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow, pBlock, el,
|
||||||
idStr);
|
idStr);
|
||||||
|
|
||||||
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||||
|
|
|
@ -2139,12 +2139,13 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
||||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
|
||||||
if (pBlockData->nRow > 0) {
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
if (pBlockData->nRow > 0) {
|
||||||
|
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||||
|
@ -2619,6 +2620,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
ASSERT(tsLast >= pBlock->maxKey.ts);
|
ASSERT(tsLast >= pBlock->maxKey.ts);
|
||||||
tBlockDataReset(&pReader->status.fileBlockData);
|
tBlockDataReset(&pReader->status.fileBlockData);
|
||||||
|
|
||||||
|
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
|
||||||
code = buildComposedDataBlock(pReader);
|
code = buildComposedDataBlock(pReader);
|
||||||
} else { // whole block is required, return it directly
|
} else { // whole block is required, return it directly
|
||||||
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
||||||
|
|
|
@ -564,23 +564,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
return pTaskInfo->code;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qKillTask(qTaskInfo_t qinfo) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
|
||||||
if (pTaskInfo == NULL) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
qAsyncKillTask(qinfo);
|
|
||||||
|
|
||||||
// Wait for the query executing thread being stopped/
|
|
||||||
// Once the query is stopped, the owner of qHandle will be cleared immediately.
|
|
||||||
while (pTaskInfo->owner != 0) {
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
|
|
|
@ -3640,9 +3640,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t num = 0;
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
|
||||||
|
|
||||||
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||||
|
|
|
@ -91,48 +91,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
|
||||||
return dst;
|
return dst;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfElems) {
|
|
||||||
assert(pArray != NULL && pData != NULL);
|
|
||||||
if (numOfElems <= 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pArray);
|
|
||||||
if (numOfElems >= size) {
|
|
||||||
taosArrayClear(pArray);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t i = pData[0] + 1, j = 0;
|
|
||||||
while (i < size) {
|
|
||||||
if (j == numOfElems - 1) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* p = TARRAY_GET_ELEM(pArray, i);
|
|
||||||
if (i > pData[j] && i < pData[j + 1]) {
|
|
||||||
char* dst = TARRAY_GET_ELEM(pArray, i - (j + 1));
|
|
||||||
memmove(dst, p, pArray->elemSize);
|
|
||||||
} else if (i == pData[j + 1]) {
|
|
||||||
j += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(i == pData[numOfElems - 1] + 1 && i <= size);
|
|
||||||
|
|
||||||
int32_t srcIndex = pData[numOfElems - 1] + 1;
|
|
||||||
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
|
|
||||||
if (pArray->size - srcIndex > 0) {
|
|
||||||
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
|
|
||||||
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
|
|
||||||
memmove(dst, src, pArray->elemSize * (pArray->size - srcIndex));
|
|
||||||
}
|
|
||||||
|
|
||||||
pArray->size -= numOfElems;
|
|
||||||
}
|
|
||||||
|
|
||||||
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
|
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
|
||||||
assert(pArray);
|
assert(pArray);
|
||||||
|
|
||||||
|
@ -435,17 +393,6 @@ void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
|
||||||
taosSort(pArray->pData, pArray->size, pArray->elemSize, comparFn);
|
taosSort(pArray->pData, pArray->size, pArray->elemSize, comparFn);
|
||||||
}
|
}
|
||||||
|
|
||||||
char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int32_t flags) {
|
|
||||||
assert(pArray != NULL);
|
|
||||||
assert(key != NULL);
|
|
||||||
|
|
||||||
void* p = taosbsearch(&key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
|
|
||||||
if (p == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
return *(char**)p;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t taosArrayPartition(SArray* pArray, int32_t i, int32_t j, __ext_compar_fn_t fn, const void* userData) {
|
static int32_t taosArrayPartition(SArray* pArray, int32_t i, int32_t j, __ext_compar_fn_t fn, const void* userData) {
|
||||||
void* key = taosArrayGetP(pArray, i);
|
void* key = taosArrayGetP(pArray, i);
|
||||||
while (i < j) {
|
while (i < j) {
|
||||||
|
@ -543,26 +490,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
|
||||||
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
|
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
|
||||||
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
||||||
}
|
}
|
||||||
// TODO(yihaoDeng) add order array<type>
|
|
||||||
//
|
|
||||||
|
|
||||||
char* taosShowStrArray(const SArray* pArray) {
|
|
||||||
int32_t sz = pArray->size;
|
|
||||||
int32_t tlen = 0;
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
tlen += strlen(taosArrayGetP(pArray, i)) + 1;
|
|
||||||
}
|
|
||||||
char* res = taosMemoryCalloc(1, tlen);
|
|
||||||
char* buf = res;
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
char* str = taosArrayGetP(pArray, i);
|
|
||||||
int32_t len = strlen(str);
|
|
||||||
memcpy(buf, str, len);
|
|
||||||
buf += len;
|
|
||||||
if (i != sz - 1) *buf = ',';
|
|
||||||
}
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
void taosArraySwap(SArray* a, SArray* b) {
|
void taosArraySwap(SArray* a, SArray* b) {
|
||||||
if (a == NULL || b == NULL) return;
|
if (a == NULL || b == NULL) return;
|
||||||
size_t t = a->size;
|
size_t t = a->size;
|
||||||
|
|
Loading…
Reference in New Issue