Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode
This commit is contained in:
commit
041c977c5d
|
@ -76,25 +76,20 @@ void indexOptsDestroy(SIndexOpts* opts);
|
|||
* @param:
|
||||
*/
|
||||
|
||||
SIndexTerm* indexTermCreate(int64_t suid,
|
||||
SIndexOperOnColumn operType,
|
||||
uint8_t colType,
|
||||
const char* colName,
|
||||
int32_t nColName,
|
||||
const char* colVal,
|
||||
int32_t nColVal);
|
||||
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName,
|
||||
int32_t nColName, const char* colVal, int32_t nColVal);
|
||||
void indexTermDestroy(SIndexTerm* p);
|
||||
|
||||
/*
|
||||
* init index
|
||||
*
|
||||
*/
|
||||
int32_t indexInit();
|
||||
/*
|
||||
* destory index
|
||||
* init index env
|
||||
*
|
||||
*/
|
||||
void indexInit();
|
||||
|
||||
/*
|
||||
* destory index env
|
||||
*
|
||||
*/
|
||||
void indexCleanUp();
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -91,9 +91,10 @@ enum {
|
|||
META_TYPE_NON_TABLE = 1,
|
||||
META_TYPE_CTABLE,
|
||||
META_TYPE_TABLE,
|
||||
META_TYPE_BOTH_TABLE,
|
||||
META_TYPE_BOTH_TABLE
|
||||
};
|
||||
|
||||
|
||||
typedef struct STableMetaOutput {
|
||||
int32_t metaType;
|
||||
char ctbFname[TSDB_TABLE_FNAME_LEN];
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
#include "catalog.h"
|
||||
|
||||
typedef struct SSchedulerCfg {
|
||||
int32_t maxJobNum;
|
||||
uint32_t maxJobNum;
|
||||
} SSchedulerCfg;
|
||||
|
||||
typedef struct SQueryProfileSummary {
|
||||
|
|
|
@ -51,6 +51,7 @@ void taosFreeQitem(void *pItem);
|
|||
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||
bool taosQueueEmpty(STaosQueue *queue);
|
||||
int32_t taosQueueSize(STaosQueue *queue);
|
||||
|
||||
STaosQall *taosAllocateQall();
|
||||
void taosFreeQall(STaosQall *qall);
|
||||
|
|
|
@ -19,11 +19,13 @@ add_library(meta STATIC ${META_SRC})
|
|||
target_include_directories(
|
||||
meta
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/meta"
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
target_link_libraries(
|
||||
meta
|
||||
PUBLIC common
|
||||
PUBLIC index
|
||||
)
|
||||
|
||||
if(${META_DB_IMPL} STREQUAL "BDB")
|
||||
|
|
|
@ -13,9 +13,13 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "index.h"
|
||||
#include "metaDef.h"
|
||||
|
||||
struct SMetaIdx {
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
SIndex *pIdx;
|
||||
#endif
|
||||
/* data */
|
||||
};
|
||||
|
||||
|
@ -43,6 +47,13 @@ int metaOpenIdx(SMeta *pMeta) {
|
|||
rocksdb_options_destroy(options);
|
||||
#endif
|
||||
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
SIndexOpts opts;
|
||||
if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -53,14 +64,47 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
|
|||
pMeta->pIdx = NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
SIndexOpts opts;
|
||||
if (indexClose(pMeta->pIdx->pIdx) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions) {
|
||||
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
if (pTbCfgs - type == META_CHILD_TABLE) {
|
||||
char buf[8] = {0};
|
||||
int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
|
||||
sprintf(buf, "%d", colId); // colname
|
||||
|
||||
char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
|
||||
|
||||
tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id
|
||||
tb_uid_t tuid = 0; // child table uid
|
||||
SIndexMultiTerm *terms = indexMultiTermCreate();
|
||||
SIndexTerm * term =
|
||||
indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid);
|
||||
indexMultiTermAdd(terms, term);
|
||||
|
||||
int ret = indexPut(pMeta->pIdx->pIdx, terms);
|
||||
indexMultiTermDestroy(terms);
|
||||
return ret;
|
||||
} else {
|
||||
return DB_DONOTINDEX;
|
||||
}
|
||||
#endif
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
|
||||
#endif
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,9 +22,10 @@ int metaOpenUidGnrt(SMeta *pMeta) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */ }
|
||||
void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
|
||||
}
|
||||
|
||||
tb_uid_t metaGenerateUid(SMeta *pMeta) {
|
||||
// Generate a new table UID
|
||||
return ++(pMeta->uidGnrt.nextUid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
|||
tNameGetFullDbName(pTableName, db);
|
||||
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
|
||||
|
||||
// REMOEV THIS ....
|
||||
if (0 == tbMeta->vgId) {
|
||||
SVgroupInfo vgroup = {0};
|
||||
|
||||
catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup);
|
||||
|
||||
tbMeta->vgId = vgroup.vgId;
|
||||
}
|
||||
// REMOVE THIS ....
|
||||
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
|
||||
} else {
|
||||
|
|
|
@ -26,17 +26,26 @@ extern "C" {
|
|||
struct SDataSink;
|
||||
struct SDataSinkHandle;
|
||||
|
||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes);
|
||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen);
|
||||
typedef struct SDataSinkManager {
|
||||
SDataSinkMgtCfg cfg;
|
||||
pthread_mutex_t mutex;
|
||||
} SDataSinkManager;
|
||||
|
||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
|
||||
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
|
||||
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
|
||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
|
||||
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||
|
||||
typedef struct SDataSinkHandle {
|
||||
FPutDataBlock fPut;
|
||||
FGetDataBlock fGet;
|
||||
FEndPut fEndPut;
|
||||
FGetDataLength fGetLen;
|
||||
FGetDataBlock fGetData;
|
||||
FDestroyDataSinker fDestroy;
|
||||
} SDataSinkHandle;
|
||||
|
||||
int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
|
||||
int32_t createDataDispatcher(SDataSinkManager* pManager, const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ extern "C" {
|
|||
#define DS_CAPACITY_ENOUGH 1
|
||||
#define DS_CAPACITY_FULL 2
|
||||
#define DS_NEED_SCHEDULE 3
|
||||
#define DS_END 4
|
||||
#define DS_IN_PROCESS 5
|
||||
|
||||
struct SDataSink;
|
||||
struct SSDataBlock;
|
||||
|
@ -39,11 +41,16 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
|
|||
|
||||
typedef void* DataSinkHandle;
|
||||
|
||||
typedef struct SDataResult {
|
||||
SQueryCostInfo profile;
|
||||
typedef struct SInputData {
|
||||
const SSDataBlock* pData;
|
||||
SHashObj* pTableRetrieveTsMap;
|
||||
} SDataResult;
|
||||
} SInputData;
|
||||
|
||||
typedef struct SOutPutData {
|
||||
int32_t numOfRows;
|
||||
int8_t compressed;
|
||||
char* pData;
|
||||
} SOutPutData;
|
||||
|
||||
/**
|
||||
* Create a subplan's datasinker handle for all later operations.
|
||||
|
@ -59,30 +66,25 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
|
|||
* @param pRes
|
||||
* @return error code
|
||||
*/
|
||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes);
|
||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
|
||||
|
||||
void dsEndPut(DataSinkHandle handle);
|
||||
|
||||
/**
|
||||
* Get the length of the data returned by the next call to dsGetDataBlock.
|
||||
* @param handle
|
||||
* @return data length
|
||||
*/
|
||||
int32_t dsGetDataLength(DataSinkHandle handle);
|
||||
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
|
||||
|
||||
/**
|
||||
* Get data, the caller needs to allocate data memory.
|
||||
* @param handle
|
||||
* @param pData output
|
||||
* @param pLen output
|
||||
* @param pOutput output
|
||||
* @param pStatus output
|
||||
* @return error code
|
||||
*/
|
||||
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen);
|
||||
|
||||
/**
|
||||
* Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
|
||||
* @param handle
|
||||
* @return datasinker status
|
||||
*/
|
||||
int32_t dsGetStatus(DataSinkHandle handle);
|
||||
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
|
||||
|
||||
/**
|
||||
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
|
||||
|
|
|
@ -20,21 +20,29 @@
|
|||
#include "tglobal.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos
|
||||
#define GET_BUF_REMAIN(buf) (buf)->remain
|
||||
#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
|
||||
|
||||
typedef struct SBuf {
|
||||
int32_t size;
|
||||
int32_t pos;
|
||||
int32_t remain;
|
||||
typedef struct SDataDispatchBuf {
|
||||
int32_t useSize;
|
||||
int32_t allocSize;
|
||||
char* pData;
|
||||
} SBuf;
|
||||
} SDataDispatchBuf;
|
||||
|
||||
typedef struct SDataCacheEntry {
|
||||
int32_t dataLen;
|
||||
int32_t numOfRows;
|
||||
int8_t compressed;
|
||||
char data[];
|
||||
} SDataCacheEntry;
|
||||
|
||||
typedef struct SDataDispatchHandle {
|
||||
SDataSinkHandle sink;
|
||||
SDataSinkManager* pManager;
|
||||
SDataBlockSchema schema;
|
||||
STaosQueue* pDataBlocks;
|
||||
SBuf buf;
|
||||
SDataDispatchBuf nextOutput;
|
||||
int32_t status;
|
||||
pthread_mutex_t mutex;
|
||||
} SDataDispatchHandle;
|
||||
|
||||
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
|
||||
|
@ -53,87 +61,156 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSche
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
||||
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
|
||||
int32_t colSize = pColRes->info.bytes * numOfRows;
|
||||
return (*(tDataTypes[pColRes->info.type].compFunc))(
|
||||
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
||||
}
|
||||
|
||||
static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
|
||||
static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
|
||||
int32_t *compSizes = (int32_t*)data;
|
||||
if (compressed) {
|
||||
data += pSchema->numOfCols * sizeof(int32_t);
|
||||
}
|
||||
|
||||
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
|
||||
SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col);
|
||||
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
|
||||
if (compressed) {
|
||||
compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed);
|
||||
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
|
||||
data += compSizes[col];
|
||||
*compLen += compSizes[col];
|
||||
compSizes[col] = htonl(compSizes[col]);
|
||||
} else {
|
||||
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows);
|
||||
data += pColRes->info.bytes * pRes->pData->info.rows;
|
||||
memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows);
|
||||
data += pColRes->info.bytes * pInput->pData->info.rows;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap);
|
||||
int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
|
||||
*(int32_t*)data = htonl(numOfTables);
|
||||
data += sizeof(int32_t);
|
||||
|
||||
STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL);
|
||||
STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
|
||||
while (item) {
|
||||
STableIdInfo* pDst = (STableIdInfo*)data;
|
||||
pDst->uid = htobe64(item->uid);
|
||||
pDst->key = htobe64(item->key);
|
||||
data += sizeof(STableIdInfo);
|
||||
item = taosHashIterate(pRes->pTableRetrieveTsMap, item);
|
||||
item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
|
||||
}
|
||||
}
|
||||
|
||||
static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) {
|
||||
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData;
|
||||
pRsp->useconds = htobe64(pRes->profile.elapsedTime);
|
||||
pRsp->precision = htons(pHandle->schema.precision);
|
||||
pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema));
|
||||
pRsp->numOfRows = htonl(pRes->pData->info.rows);
|
||||
// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
|
||||
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
|
||||
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
||||
pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
|
||||
pEntry->numOfRows = pInput->pData->info.rows;
|
||||
|
||||
*pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp);
|
||||
doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen);
|
||||
*pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows);
|
||||
|
||||
pRsp->compLen = htonl(pRsp->compLen);
|
||||
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
|
||||
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
|
||||
pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows);
|
||||
// todo completed
|
||||
}
|
||||
|
||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
int32_t useSize = 0;
|
||||
toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize);
|
||||
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) {
|
||||
return false;
|
||||
}
|
||||
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
|
||||
pBuf->pData = malloc(pBuf->allocSize);
|
||||
return NULL != pBuf->pData;
|
||||
}
|
||||
|
||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) {
|
||||
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
|
||||
pthread_mutex_lock(&pDispatcher->mutex);
|
||||
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
|
||||
pDispatcher->status = status;
|
||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||
return status;
|
||||
}
|
||||
|
||||
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
||||
pthread_mutex_lock(&pDispatcher->mutex);
|
||||
int32_t status = pDispatcher->status;
|
||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||
return status;
|
||||
}
|
||||
|
||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
|
||||
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
||||
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
||||
*pStatus = updateStatus(pDispatcher);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void endPut(struct SDataSinkHandle* pHandle) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
pthread_mutex_lock(&pDispatcher->mutex);
|
||||
pDispatcher->status = DS_END;
|
||||
pthread_mutex_unlock(&pDispatcher->mutex);
|
||||
}
|
||||
|
||||
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
|
||||
return 0;
|
||||
}
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||
taosFreeQitem(pBuf);
|
||||
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
|
||||
}
|
||||
|
||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||
pOutput->numOfRows = pEntry->numOfRows;
|
||||
pOutput->compressed = pEntry->compressed;
|
||||
tfree(pDispatcher->nextOutput.pData); // todo persistent
|
||||
*pStatus = updateStatus(pDispatcher);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||
|
||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||
tfree(pDispatcher->nextOutput.pData);
|
||||
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
tfree(pBuf->pData);
|
||||
taosFreeQitem(pBuf);
|
||||
}
|
||||
taosCloseQueue(pDispatcher->pDataBlocks);
|
||||
pthread_mutex_destroy(&pDispatcher->mutex);
|
||||
}
|
||||
|
||||
int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) {
|
||||
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
|
||||
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
|
||||
if (NULL == dispatcher) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
dispatcher->sink.fPut = putDataBlock;
|
||||
dispatcher->sink.fGet = getDataBlock;
|
||||
dispatcher->sink.fGetLen = getDataLength;
|
||||
dispatcher->sink.fGetData = getDataBlock;
|
||||
dispatcher->sink.fDestroy = destroyDataSinker;
|
||||
dispatcher->pManager = pManager;
|
||||
dispatcher->schema = pDataSink->schema;
|
||||
dispatcher->status = DS_CAPACITY_ENOUGH;
|
||||
dispatcher->pDataBlocks = taosOpenQueue();
|
||||
pthread_mutex_init(&dispatcher->mutex, NULL);
|
||||
if (NULL == dispatcher->pDataBlocks) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
*pHandle = dispatcher;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -17,33 +17,38 @@
|
|||
#include "dataSinkInt.h"
|
||||
#include "planner.h"
|
||||
|
||||
static SDataSinkManager gDataSinkManager = {0};
|
||||
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
|
||||
// todo
|
||||
gDataSinkManager.cfg = *cfg;
|
||||
pthread_mutex_init(&gDataSinkManager.mutex, NULL);
|
||||
}
|
||||
|
||||
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
|
||||
if (DSINK_Dispatch == pDataSink->info.type) {
|
||||
return createDataDispatcher(pDataSink, pHandle);
|
||||
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) {
|
||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
|
||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||
return pHandleImpl->fPut(pHandleImpl, pRes);
|
||||
return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
|
||||
}
|
||||
|
||||
int32_t dsGetDataLength(DataSinkHandle handle) {
|
||||
// todo
|
||||
}
|
||||
|
||||
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) {
|
||||
void dsEndPut(DataSinkHandle handle) {
|
||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||
return pHandleImpl->fGet(pHandleImpl, pData, pLen);
|
||||
return pHandleImpl->fEndPut(pHandleImpl);
|
||||
}
|
||||
|
||||
int32_t dsGetStatus(DataSinkHandle handle) {
|
||||
// todo
|
||||
int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
|
||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||
return pHandleImpl->fGetLen(pHandleImpl, pStatus);
|
||||
}
|
||||
|
||||
int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
|
||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
|
||||
}
|
||||
|
||||
void dsScheduleProcess(void* ahandle, void* pItem) {
|
||||
|
|
|
@ -30,21 +30,17 @@
|
|||
|
||||
void* indexQhandle = NULL;
|
||||
|
||||
int32_t indexInit() {
|
||||
void indexInit() {
|
||||
// refactor later
|
||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
||||
return indexQhandle == NULL ? -1 : 0;
|
||||
// do nothing
|
||||
}
|
||||
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
|
||||
|
||||
static int uidCompare(const void* a, const void* b) {
|
||||
// add more version compare
|
||||
uint64_t u1 = *(uint64_t*)a;
|
||||
uint64_t u2 = *(uint64_t*)b;
|
||||
if (u1 == u2) {
|
||||
return 0;
|
||||
} else {
|
||||
return u1 < u2 ? -1 : 1;
|
||||
}
|
||||
return u1 - u2;
|
||||
}
|
||||
typedef struct SIdxColInfo {
|
||||
int colId; // generated by index internal
|
||||
|
@ -61,7 +57,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
|
|||
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
||||
|
||||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||
// pthread_once(&isInit, indexInit);
|
||||
pthread_once(&isInit, indexInit);
|
||||
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
||||
if (sIdx == NULL) { return -1; }
|
||||
|
||||
|
|
|
@ -71,7 +71,10 @@ TFileCache* tfileCacheCreate(const char* path) {
|
|||
}
|
||||
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
if (reader == NULL) { goto End; }
|
||||
if (reader == NULL) {
|
||||
indexInfo("skip invalid file: %s", file);
|
||||
continue;
|
||||
}
|
||||
TFileHeader* header = &reader->header;
|
||||
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||
|
||||
|
|
|
@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t setTableVgroupList(SParseBasicCtx *pCtx, SName* name, SVgroupsInfo **pVgList) {
|
||||
SArray* vgroupList = NULL;
|
||||
int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vgroupNum = taosArrayGetSize(vgroupList);
|
||||
|
||||
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum);
|
||||
|
||||
vgList->numOfVgroups = vgroupNum;
|
||||
|
||||
for (int32_t i = 0; i < vgroupNum; ++i) {
|
||||
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
|
||||
vgList->vgroups[i].vgId = vg->vgId;
|
||||
vgList->vgroups[i].numOfEps = vg->numOfEps;
|
||||
memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr));
|
||||
}
|
||||
|
||||
*pVgList = vgList;
|
||||
|
||||
taosArrayDestroy(vgroupList);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) {
|
||||
assert(pCtx != NULL && pInfo != NULL);
|
||||
int32_t code = 0;
|
||||
|
@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
data.pTableMeta = taosArrayInit(1, POINTER_BYTES);
|
||||
taosArrayPush(data.pTableMeta, &pmt);
|
||||
|
||||
|
@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
|
|||
pQueryInfo->pTableMetaInfo[0]->name = *name;
|
||||
pQueryInfo->numOfTables = 1;
|
||||
|
||||
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(data.pTableMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
// evaluate the sqlnode
|
||||
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
|
||||
assert(pTableMeta != NULL);
|
||||
|
|
|
@ -237,6 +237,9 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) {
|
|||
}
|
||||
|
||||
void qDestroyQuery(SQueryNode* pQueryNode) {
|
||||
if (NULL == pQueryNode) {
|
||||
return;
|
||||
}
|
||||
if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
|
||||
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
|
||||
taosArrayDestroy(pModifInfo->pDataBlocks);
|
||||
|
|
|
@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
|
|||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
|
||||
}
|
||||
|
||||
static bool isSystemTable(SQueryTableInfo* pTable) {
|
||||
// todo
|
||||
|
@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
|
|||
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
|
||||
}
|
||||
|
||||
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
|
||||
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
|
||||
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
|
||||
}
|
||||
|
||||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
||||
|
||||
if (needMultiNodeScan(pTable)) {
|
||||
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
||||
}
|
||||
return createSingleTableScanNode(pPlanNode, pTable);
|
||||
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
|
||||
}
|
||||
|
||||
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
|
@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
|||
if (QNODE_MODIFY == pRoot->info.type) {
|
||||
splitModificationOpSubPlan(pCxt, pRoot);
|
||||
} else {
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
||||
++(pCxt->nextId.templateId);
|
||||
|
||||
subplan->msgType = TDMT_VND_QUERY;
|
||||
subplan->pNode = createPhyNode(pCxt, pRoot);
|
||||
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
|
||||
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
|
||||
}
|
||||
// todo deal subquery
|
||||
}
|
||||
|
|
|
@ -398,7 +398,7 @@ static bool exprNodeFromJson(const cJSON* json, void* obj) {
|
|||
case TEXPR_FUNCTION_NODE:
|
||||
return fromObject(json, jkExprNodeFunction, functionFromJson, exprInfo, false);
|
||||
case TEXPR_COL_NODE:
|
||||
return fromObject(json, jkExprNodeColumn, schemaFromJson, exprInfo->pSchema, false);
|
||||
return fromObjectWithAlloc(json, jkExprNodeColumn, schemaFromJson, (void**)&exprInfo->pSchema, sizeof(SSchema), false);
|
||||
case TEXPR_VALUE_NODE:
|
||||
return fromObject(json, jkExprNodeValue, variantFromJson, exprInfo->pVal, false);
|
||||
default:
|
||||
|
|
|
@ -37,10 +37,10 @@ enum {
|
|||
};
|
||||
|
||||
typedef struct SSchedulerMgmt {
|
||||
uint64_t taskId;
|
||||
uint64_t sId;
|
||||
uint64_t taskId; // sequential taksId
|
||||
uint64_t sId; // schedulerId
|
||||
SSchedulerCfg cfg;
|
||||
SHashObj *jobs; // key: queryId, value: SQueryJob*
|
||||
SHashObj *jobs; // key: queryId, value: SQueryJob*
|
||||
} SSchedulerMgmt;
|
||||
|
||||
typedef struct SSchCallbackParam {
|
||||
|
@ -83,52 +83,61 @@ typedef struct SSchJobAttr {
|
|||
|
||||
typedef struct SSchJob {
|
||||
uint64_t queryId;
|
||||
int32_t levelNum;
|
||||
int32_t levelIdx;
|
||||
int8_t status;
|
||||
SSchJobAttr attr;
|
||||
SEpSet dataSrcEps;
|
||||
SEpAddr resEp;
|
||||
int32_t levelNum;
|
||||
void *transport;
|
||||
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr
|
||||
tsem_t rspSem;
|
||||
int32_t userFetch;
|
||||
int32_t remoteFetch;
|
||||
SSchTask *fetchTask;
|
||||
|
||||
int32_t errCode;
|
||||
void *res;
|
||||
int32_t resNumOfRows;
|
||||
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
|
||||
SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
|
||||
|
||||
int32_t levelIdx;
|
||||
SEpSet dataSrcEps;
|
||||
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
|
||||
|
||||
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
|
||||
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray<void*>
|
||||
|
||||
int8_t status;
|
||||
SQueryNodeAddr resNode;
|
||||
tsem_t rspSem;
|
||||
int32_t userFetch;
|
||||
int32_t remoteFetch;
|
||||
SSchTask *fetchTask;
|
||||
int32_t errCode;
|
||||
void *res;
|
||||
int32_t resNumOfRows;
|
||||
SQueryProfileSummary summary;
|
||||
} SSchJob;
|
||||
|
||||
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
|
||||
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
|
||||
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
|
||||
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
|
||||
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
|
||||
|
||||
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
|
||||
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
|
||||
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
|
||||
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
|
||||
|
||||
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
|
||||
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
|
||||
|
||||
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
|
||||
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
|
||||
|
||||
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
|
||||
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
|
||||
|
||||
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
|
||||
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
|
||||
|
||||
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
|
||||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
||||
|
||||
|
||||
extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
|
||||
static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -112,6 +112,13 @@ bool taosQueueEmpty(STaosQueue *queue) {
|
|||
return empty;
|
||||
}
|
||||
|
||||
int32_t taosQueueSize(STaosQueue *queue) {
|
||||
pthread_mutex_lock(&queue->mutex);
|
||||
int32_t numOfItems = queue->numOfItems;
|
||||
pthread_mutex_unlock(&queue->mutex);
|
||||
return numOfItems;
|
||||
}
|
||||
|
||||
void *taosAllocateQitem(int32_t size) {
|
||||
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
|
||||
|
||||
|
|
Loading…
Reference in New Issue