commit
624d62bade
|
@ -213,8 +213,6 @@ typedef struct SDataBlockList {
|
|||
int32_t idx;
|
||||
uint32_t nSize;
|
||||
uint32_t nAlloc;
|
||||
char * userParam; /* user assigned parameters for async query */
|
||||
void * udfp; /* user defined function pointer, used in async model */
|
||||
STableDataBlocks **pData;
|
||||
} SDataBlockList;
|
||||
|
||||
|
@ -451,7 +449,6 @@ void tscCloseTscObj(STscObj *pObj);
|
|||
|
||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
|
||||
|
||||
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
|
||||
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
|
||||
void tscKillMetricQuery(SSqlObj *pSql);
|
||||
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
||||
|
|
|
@ -77,7 +77,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
}
|
||||
|
||||
/* Set termination handler. */
|
||||
struct sigaction act;
|
||||
struct sigaction act = {0};
|
||||
act.sa_flags = SA_SIGINFO;
|
||||
act.sa_sigaction = signal_handler;
|
||||
sigaction(SIGTERM, &act, NULL);
|
||||
|
|
|
@ -290,8 +290,9 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
|||
if (qHasMoreResultsToRetrieve(pQInfo)) {
|
||||
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
|
||||
} else { // no further execution invoked, release the ref to vnode
|
||||
dnodeProcessReadResult(pVnode, pMsg);
|
||||
//vnodeRelease(pVnode);
|
||||
qDestroyQueryInfo(pQInfo);
|
||||
// dnodeProcessReadResult(pVnode, pMsg);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -305,5 +306,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
|||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
|
|
@ -198,6 +198,12 @@ typedef struct SQInfo {
|
|||
*/
|
||||
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
|
||||
|
||||
/**
|
||||
* destroy the query info struct
|
||||
* @param pQInfo
|
||||
*/
|
||||
void qDestroyQueryInfo(SQInfo* pQInfo);
|
||||
|
||||
/**
|
||||
* query on single table
|
||||
* @param pReadMsg
|
||||
|
|
|
@ -1566,6 +1566,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
|
||||
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
||||
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
|
||||
}
|
||||
|
||||
|
@ -2244,11 +2245,8 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
|
||||
|
||||
// tSidSetDestroy(&pQInfo->pSidSet);
|
||||
|
||||
if (pQInfo->pTableDataInfo != NULL) {
|
||||
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
|
||||
for (int32_t j = 0; j < 0; ++j) {
|
||||
|
@ -4202,8 +4200,9 @@ int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTable
|
|||
}
|
||||
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
|
||||
taosArrayDestroy(cols);
|
||||
|
||||
pRuntimeEnv->pQuery = pQuery;
|
||||
|
||||
pRuntimeEnv->pTSBuf = param;
|
||||
pRuntimeEnv->cur.vnodeIndex = -1;
|
||||
if (param != NULL) {
|
||||
|
@ -5444,13 +5443,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
*tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
|
||||
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
|
||||
}
|
||||
|
||||
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
|
||||
"timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64
|
||||
", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64,
|
||||
|
||||
dTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, "
|
||||
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
|
||||
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey,
|
||||
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType,
|
||||
pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols,
|
||||
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols,
|
||||
pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen,
|
||||
pQueryMsg->limit, pQueryMsg->offset);
|
||||
|
||||
|
@ -5974,6 +5971,8 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
tfree(pQuery->pGroupbyExpr);
|
||||
tfree(pQuery);
|
||||
|
||||
taosArrayDestroy(pQInfo->pTableIdList);
|
||||
|
||||
dTrace("QInfo:%p QInfo is freed", pQInfo);
|
||||
|
||||
// destroy signature, in order to avoid the query process pass the object safety check
|
||||
|
@ -6120,6 +6119,10 @@ _query_over:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qDestroyQueryInfo(SQInfo* pQInfo) {
|
||||
freeQInfo(pQInfo);
|
||||
}
|
||||
|
||||
void qTableQuery(SQInfo *pQInfo) {
|
||||
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
|
||||
dTrace("%p freed abort query", pQInfo);
|
||||
|
@ -6133,6 +6136,9 @@ void qTableQuery(SQInfo *pQInfo) {
|
|||
|
||||
dTrace("QInfo:%p query task is launched", pQInfo);
|
||||
|
||||
// sem_post(&pQInfo->dataReady);
|
||||
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
|
||||
if (numOfTables == 1) {
|
||||
singleTableQueryImpl(pQInfo);
|
||||
|
@ -6212,7 +6218,6 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
|
|||
|
||||
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
||||
(*pRsp)->completed = 1; // notify no more result to client
|
||||
freeQInfo(pQInfo);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -347,7 +347,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
|
|||
va_start(argpointer, format);
|
||||
int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer);
|
||||
if (writeLen <= 0) {
|
||||
char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE];
|
||||
char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE] = {0};
|
||||
writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer);
|
||||
strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE);
|
||||
len += MAX_LOGLINE_CONTENT_SIZE;
|
||||
|
|
|
@ -337,6 +337,12 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
|
|||
*/
|
||||
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
|
||||
|
||||
/**
|
||||
* clean up the query handle
|
||||
* @param queryHandle
|
||||
*/
|
||||
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -282,7 +282,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
|
|||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
|
||||
pQueryHandle->pColumns = pColumnInfo;
|
||||
pQueryHandle->loadDataAfterSeek = false;
|
||||
pQueryHandle->isFirstSlot = true;
|
||||
|
||||
|
@ -331,9 +330,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
|
|||
vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
||||
vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
|
||||
|
||||
int32_t vnodeId = 1;
|
||||
vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo);
|
||||
|
||||
return (tsdb_query_handle_t)pQueryHandle;
|
||||
}
|
||||
|
||||
|
@ -468,6 +464,7 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(sa);
|
||||
return pQueryHandle->realNumOfRows > 0;
|
||||
}
|
||||
|
||||
|
@ -751,7 +748,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
|||
int32_t tid = pCheckInfo->tableId.tid;
|
||||
|
||||
while (pCheckInfo->pFileGroup != NULL) {
|
||||
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) {
|
||||
if (getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup) != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -761,7 +758,6 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
|||
pCheckInfo->pFileGroup->fileId, tid);
|
||||
|
||||
pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -790,7 +786,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
|||
assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks);
|
||||
|
||||
// load first data block into memory failed, caused by disk block error
|
||||
bool blockLoaded = false;
|
||||
bool blockLoaded = false;
|
||||
SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||
|
||||
// todo no need to loaded at all
|
||||
|
@ -810,8 +806,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
|||
pFile->fd = open(pFile->fname, O_RDONLY);
|
||||
}
|
||||
|
||||
if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1,
|
||||
pCheckInfo->pDataCols, data) == 0) {
|
||||
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
|
||||
blockLoaded = true;
|
||||
}
|
||||
|
||||
|
@ -825,12 +820,19 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
|||
|
||||
// todo search qualified points in blk, according to primary key (timestamp) column
|
||||
SDataCols* pDataCols = pCheckInfo->pDataCols;
|
||||
|
||||
TSKEY* d = (TSKEY*) pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
|
||||
assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
|
||||
|
||||
cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order);
|
||||
|
||||
cur->fid = pCheckInfo->pFileGroup->fileId;
|
||||
assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
|
||||
|
||||
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
|
||||
|
||||
taosArrayDestroy(sa);
|
||||
tfree(data);
|
||||
return pQueryHandle->realNumOfRows > 0;
|
||||
}
|
||||
|
||||
|
@ -838,8 +840,6 @@ static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) {
|
|||
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
|
||||
|
||||
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
|
||||
// SQueryFilePos* cur = &pHandle->cur;
|
||||
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||
|
||||
if (!pCheckInfo->checkFirstFileBlock) {
|
||||
|
@ -1351,3 +1351,34 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo
|
|||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
|
||||
|
||||
|
||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||
tSkipListDestroyIter(pTableCheckInfo->iter);
|
||||
|
||||
tfree(pTableCheckInfo->pDataCols->buf);
|
||||
tfree(pTableCheckInfo->pDataCols);
|
||||
|
||||
tfree(pTableCheckInfo->pCompInfo);
|
||||
tfree(pTableCheckInfo->compIndex);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
|
||||
|
||||
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||
for(int32_t i = 0; i < cols; ++i) {
|
||||
SColumnInfoEx *pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
tfree(pColInfo->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pQueryHandle->pColumns);
|
||||
|
||||
tfree(pQueryHandle->unzipBuffer);
|
||||
tfree(pQueryHandle->secondaryUnzipBuffer);
|
||||
tfree(pQueryHandle);
|
||||
}
|
||||
|
|
|
@ -24,17 +24,43 @@
|
|||
|
||||
void taosMsleep(int mseconds);
|
||||
|
||||
static int32_t doQuery(TAOS* taos, const char* sql) {
|
||||
int32_t code = taos_query(taos, sql);
|
||||
if (code != 0) {
|
||||
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* res = taos_use_result(taos);
|
||||
TAOS_ROW row = NULL;
|
||||
char buf[512] = {0};
|
||||
|
||||
int32_t numOfFields = taos_num_fields(res);
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(res);
|
||||
|
||||
while((row = taos_fetch_row(res)) != NULL) {
|
||||
taos_print_row(buf, row, pFields, numOfFields);
|
||||
printf("%s\n", buf);
|
||||
memset(buf, 0, 512);
|
||||
}
|
||||
|
||||
taos_free_result(res);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
TAOS * taos;
|
||||
char qstr[1024];
|
||||
TAOS_RES *result;
|
||||
|
||||
|
||||
// connect to server
|
||||
if (argc < 2) {
|
||||
printf("please input server-ip \n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg");
|
||||
|
||||
// init TAOS
|
||||
taos_init();
|
||||
|
||||
|
@ -45,6 +71,22 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
printf("success to connect to server\n");
|
||||
|
||||
doQuery(taos, "create database if not exists test");
|
||||
doQuery(taos, "use test");
|
||||
doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
|
||||
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
|
||||
doQuery(taos, "select * from tm0;");
|
||||
|
||||
taos_close(taos);
|
||||
return 0;
|
||||
|
||||
taos_query(taos, "drop database demo");
|
||||
if (taos_query(taos, "create database demo") != 0) {
|
||||
|
@ -53,8 +95,10 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
printf("success to create database\n");
|
||||
|
||||
|
||||
taos_query(taos, "use demo");
|
||||
|
||||
|
||||
// create table
|
||||
if (taos_query(taos, "create table m1 (ts timestamp, speed int)") != 0) {
|
||||
printf("failed to create table, reason:%s\n", taos_errstr(taos));
|
||||
|
@ -62,9 +106,11 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
printf("success to create table\n");
|
||||
|
||||
|
||||
// sleep for one second to make sure table is created on data node
|
||||
// taosMsleep(1000);
|
||||
|
||||
|
||||
// insert 10 records
|
||||
int i = 0;
|
||||
for (i = 0; i < 10; ++i) {
|
||||
|
@ -76,6 +122,7 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
printf("success to insert rows, total %d rows\n", i);
|
||||
|
||||
|
||||
// query the records
|
||||
sprintf(qstr, "SELECT * FROM m1");
|
||||
if (taos_query(taos, qstr) != 0) {
|
||||
|
@ -83,12 +130,16 @@ int main(int argc, char *argv[]) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
|
||||
result = taos_use_result(taos);
|
||||
|
||||
|
||||
if (result == NULL) {
|
||||
printf("failed to get result, reason:%s\n", taos_errstr(taos));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// TAOS_ROW row;
|
||||
|
||||
TAOS_ROW row;
|
||||
int rows = 0;
|
||||
|
@ -96,6 +147,7 @@ int main(int argc, char *argv[]) {
|
|||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
char temp[256];
|
||||
|
||||
|
||||
printf("select * from table, result:\n");
|
||||
// fetch the records row by row
|
||||
while ((row = taos_fetch_row(result))) {
|
||||
|
|
Loading…
Reference in New Issue