From 305523f47ab9710f81624590fa3144c6e25a4c0a Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 24 Mar 2020 15:10:41 +0800 Subject: [PATCH 1/5] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 5 ---- src/vnode/tsdb/src/tsdbMain.c | 50 ++++++++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 7f3acb6624..385d1f59ea 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -36,11 +36,6 @@ typedef enum { extern const char *tsdbFileSuffix[]; -typedef struct { - int64_t size; - int64_t tombSize; -} SFileInfo; - typedef struct { int8_t type; char fname[128]; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 2df7974844..afd57947b7 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -782,6 +782,39 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max return numOfRows; } +static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { + if (iters == NULL) return; + + for (int tid = 0; tid < maxTables; tid++) { + if (iters[tid] == NULL) continue; + tSkipListDestroy(iters[tid]); + } + + free(iters); +} + +static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) { + SSkipListIterator **iters = (SSkipListIterator *)calloc(maxTables, sizeof(SSkipListIterator *)); + if (iters == NULL) return NULL; + + for (int tid = 0; tid < maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + if (pTable == NULL || pTable->imem == NULL) continue; + + iters[tid] = tSkipListCreateIter(pTable->imem->pData); + if (iters[tid] == NULL) { + tsdbDestroyTableIters(iters, maxTables); + return NULL; + } + + if (!tSkipListIterNext(iters[tid])) { + assert(false); + } + } + + return iters; +} + // Commit to file static void *tsdbCommitToFile(void *arg) { // TODO @@ -791,10 +824,8 @@ static void *tsdbCommitToFile(void *arg) { STsdbCfg * pCfg = &(pRepo->config); if (pCache->imem == NULL) return; - int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); - - SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *)); + // Create the iterator to read from cache + SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); if (iters == NULL) { // TODO: deal with the error return NULL; @@ -805,10 +836,15 @@ static void *tsdbCommitToFile(void *arg) { SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); + int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); + int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + for (int fid = sfid; fid <= efid; fid++) { TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + // tsdbOpenFileForWrite(pRepo, fid); + for (int tid = 0; tid < pCfg->maxTables; tid++) { STable *pTable = pMeta->tables[tid]; if (pTable == NULL || pTable->imem == NULL) continue; @@ -837,14 +873,10 @@ static void *tsdbCommitToFile(void *arg) { } } - // Free the iterator - for (int tid = 0; tid < pCfg->maxTables; tid++) { - if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]); - } + tsdbDestroyTableIters(iters, pCfg->maxTables); free(buf); free(cols); - free(iters); tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); From de5fb15206036f8d4eb575e2379e022aef978c91 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 24 Mar 2020 16:13:11 +0800 Subject: [PATCH 2/5] [td-32] fix bugs and refactor codes --- src/client/inc/tsclient.h | 9 ++- src/client/src/tscServer.c | 14 ++-- src/client/src/tscSql.c | 3 +- src/dnode/src/dnodeRead.c | 30 ++++--- src/query/inc/queryExecutor.h | 12 +-- src/query/src/queryExecutor.c | 145 ++++++++++++++++------------------ src/util/src/shash.c | 2 - 7 files changed, 103 insertions(+), 112 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 2dd580dda0..fdc0ae9095 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -294,20 +294,21 @@ typedef struct SResRec { struct STSBuf; typedef struct { - int32_t code; int64_t numOfRows; // num of results in current retrieved int64_t numOfTotal; // num of total results int64_t numOfTotalInCurrentClause; // num of total result in current subclause char * pRsp; - int rspType; - int rspLen; + int32_t rspType; + int32_t rspLen; uint64_t qhandle; int64_t uid; int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable - int row; + int32_t row; int16_t numOfCols; int16_t precision; + bool completed; + int32_t code; int32_t numOfGroups; SResRec * pGroupRec; char * data; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 605c7a22cc..348c0709f1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -209,7 +209,6 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { - tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); @@ -256,7 +255,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); return; } else { - tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code); + tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code)); pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->res.code = rpcMsg->code; // keep the previous error code @@ -278,7 +277,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; } else { - tscTrace("%p query is cancelled, code:%d", pSql, pRes->code); + tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code)); } if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { @@ -318,19 +317,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, *(int32_t *)pRes->pRsp, pRes->rspLen); } else { - tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen); + tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen); } } if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); - + if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) { int command = pCmd->command; void *taosres = tscKeepConn[command] ? pSql : NULL; - rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows; - - tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres); + tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj @@ -2304,6 +2301,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { pRes->precision = htons(pRetrieve->precision); pRes->offset = htobe64(pRetrieve->offset); pRes->useconds = htobe64(pRetrieve->useconds); + pRes->completed = (pRetrieve->completed == 1); pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 63612d0f5f..fa50551548 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -729,7 +729,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { + if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT || + pRes->completed) { return NULL; } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 9e50804583..3419128c72 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -93,8 +93,8 @@ void dnodeRead(SRpcMsg *pMsg) { while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = 1;//htonl(pHead->vgId); - pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + pHead->contLen = htonl(pHead->contLen); void *pVnode = dnodeGetVnode(pHead->vgId); if (pVnode == NULL) { @@ -104,12 +104,13 @@ void dnodeRead(SRpcMsg *pMsg) { } // put message into queue - SReadMsg readMsg; - readMsg.rpcMsg = *pMsg; - readMsg.pCont = pCont; - readMsg.contLen = pHead->contLen; - readMsg.pRpcContext = pRpcContext; - readMsg.pVnode = pVnode; + SReadMsg readMsg = { + .rpcMsg = *pMsg, + .pCont = pCont, + .contLen = pHead->contLen, + .pRpcContext = pRpcContext, + .pVnode = pVnode, + }; taos_queue queue = dnodeGetVnodeRworker(pVnode); taosWriteQitem(queue, &readMsg); @@ -177,8 +178,6 @@ static void *dnodeProcessReadQueue(void *param) { } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - - dnodeProcessReadResult(&readMsg); } return NULL; @@ -252,17 +251,19 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { qTableQuery(pQInfo); } +static int32_t c = 0; static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - + if ((++c)%2 == 0) { + int32_t k = 1; + } int32_t rowSize = 0; int32_t numOfRows = 0; int32_t contLen = 0; - SRpcMsg rpcRsp = {0}; SRetrieveTableRsp *pRsp = NULL; int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); @@ -276,7 +277,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); } - rpcRsp = (SRpcMsg) { + SRpcMsg rpcRsp = (SRpcMsg) { .handle = pMsg->rpcMsg.handle, .pCont = pRsp, .contLen = contLen, @@ -285,4 +286,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { }; rpcSendResponse(&rpcRsp); + + //todo merge result should be done here + //dnodeProcessReadResult(&readMsg); } diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index b10d869780..4ce606f599 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -33,7 +33,7 @@ typedef struct SData { } SData; enum { - ST_QUERY_KILLED = 0, // query killed +// ST_QUERY_KILLED = 0, // query killed ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer ST_QUERY_COMPLETED = 2, // query completed }; @@ -142,8 +142,8 @@ typedef struct SQuery { SResultRec rec; int32_t pos; int64_t pointsOffset; // the number of points offset to save read data - SData** sdata; - int32_t capacity; + SData** sdata; + int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -170,14 +170,14 @@ typedef struct SQueryRuntimeEnv { } SQueryRuntimeEnv; typedef struct SQInfo { - uint64_t signature; + void* signature; void* pVnode; TSKEY startTime; - int64_t elapsedTime; + TSKEY elapsedTime; SResultRec rec; int32_t pointsInterpo; int32_t code; // error code to returned to client - int32_t killed; // denotes if current query is killed +// int32_t killed; // denotes if current query is killed sem_t dataReady; SArray* pTableIdList; // table list SQueryRuntimeEnv runtimeEnv; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 696d8c37c9..cbce5097ec 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -64,38 +64,24 @@ typedef struct SPointInterpoSupporter { } SPointInterpoSupporter; typedef enum { - - /* - * the program will call this function again, if this status is set. - * used to transfer from QUERY_RESBUF_FULL - */ + // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, - /* - * output buffer is full, so, the next query will be employed, - * in this case, we need to set the appropriated start scan point for - * the next query. - * - * this status is only exist in group-by clause and - * diff/add/division/multiply/ query. + /* result output buffer is full, current query is paused. + * this status is only exist in group-by clause and diff/add/division/multiply/ query. */ QUERY_RESBUF_FULL = 0x2u, - /* - * query is over - * 1. this status is used in one row result query process, e.g., - * count/sum/first/last/ - * avg...etc. - * 2. when the query range on timestamp is satisfied, it is also denoted as - * query_compeleted + /* query is over + * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc. + * 2. when all data within queried time window, it is also denoted as query_completed */ QUERY_COMPLETED = 0x4u, - - /* - * all data has been scanned, so current search is stopped, - * At last, the function will transfer this status to QUERY_COMPLETED + + /* when the result is not completed return to client, this status will be + * usually used in case of interval query with interpolation option */ - QUERY_NO_DATA_TO_CHECK = 0x8u, + QUERY_OVER = 0x8u, } vnodeQueryStatus; static void setQueryStatus(SQuery *pQuery, int8_t status); @@ -1301,7 +1287,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat if (pRuntimeEnv->pTSBuf != NULL) { // if timestamp filter list is empty, quit current query if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); + setQueryStatus(pQuery, QUERY_COMPLETED); break; } } @@ -1621,10 +1607,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } -bool isQueryKilled(SQuery *pQuery) { - return false; - - SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); +static bool isQueryKilled(SQuery *pQuery) { #if 0 /* * check if the queried meter is going to be deleted. @@ -1638,9 +1621,14 @@ bool isQueryKilled(SQuery *pQuery) { return (pQInfo->killed == 1); #endif + return 0; } +static bool setQueryKilled(SQInfo* pQInfo) { + pQInfo->code = TSDB_CODE_QUERY_CANCELLED; +} + bool isFixedOutputQuery(SQuery *pQuery) { if (pQuery->intervalTime != 0) { return false; @@ -2664,7 +2652,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (tsdbNextDataBlock(pQueryHandle)) { // check if query is killed or not set the status of query to pass the status check if (isQueryKilled(pQuery)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return cnt; } @@ -2714,7 +2701,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; closeAllTimeWindow(&pRuntimeEnv->windowResInfo); @@ -3631,7 +3618,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { /* check if query is killed or not */ if (isQueryKilled(pQuery)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); +// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } } @@ -4111,7 +4098,7 @@ bool vnodeHasRemainResults(void *handle) { } // query has completed - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision); // int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY @@ -4272,7 +4259,7 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { pQuery->window.ekey, pQuery->order.order); sem_post(&pQInfo->dataReady); - pQInfo->killed = 1; + setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -5024,7 +5011,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); // assert(pQuery->pointsRead <= pQuery->pointsToRead && - // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)); + // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)); // must be top/bottom query if offset > 0 if (pQuery->limit.offset > 0) { @@ -5128,7 +5115,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= c; } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } @@ -5178,7 +5165,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { doRevisedResultsByLimit(pQInfo); break; } @@ -5206,17 +5193,20 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { } void qTableQuery(SQInfo *pQInfo) { - assert(pQInfo != NULL); - - if (pQInfo->killed) { + if (pQInfo == NULL || pQInfo->signature != pQInfo) { + dTrace("%p freed abort query", pQInfo); + return; + } + + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + + SQuery *pQuery = pRuntimeEnv->pQuery; + if (isQueryKilled(pQuery)) { dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } - - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - // dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); + + dTrace("QInfo:%p query task is launched", pQInfo); if (vnodeHasRemainResults(pQInfo)) { /* @@ -5242,7 +5232,7 @@ void qTableQuery(SQInfo *pQInfo) { } // here we have scan all qualified data in both data file and cache - if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { @@ -5303,10 +5293,8 @@ void qTableQuery(SQInfo *pQInfo) { /* check if query is killed or not */ if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query is killed", pQInfo); - // pQInfo->over = 1; } else { - // dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, - // pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); + dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); } sem_post(&pQInfo->dataReady); @@ -5989,21 +5977,16 @@ bool isQInfoValid(void *param) { return (sig == (uint64_t)pQInfo); } -void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { +void vnodeFreeQInfo(SQInfo *pQInfo) { if (!isQInfoValid(pQInfo)) { return; } - pQInfo->killed = 1; + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + setQueryKilled(pQInfo); + dTrace("QInfo:%p start to free SQInfo", pQInfo); - - if (decQueryRef) { - vnodeDecMeterRefcnt(pQInfo); - } - - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - for (int col = 0; col < pQuery->numOfOutputCols; ++col) { + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } @@ -6049,7 +6032,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); - // dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); + dTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check memset(pQInfo, 0, sizeof(SQInfo)); @@ -6105,7 +6088,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE _error: // table query ref will be decrease during error handling - vnodeFreeQInfo(*pQInfo, false); + vnodeFreeQInfo(*pQInfo); return code; } @@ -6176,28 +6159,25 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro if (pQInfo == NULL || !isQInfoValid(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } - - if (pQInfo->killed) { + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; } else { // in case of not TSDB_CODE_SUCCESS, return the code to client - return abs(pQInfo->code); + return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } } sem_wait(&pQInfo->dataReady); - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - *numOfRows = pQInfo->rec.pointsRead; *rowsize = pQuery->rowSize; dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); - - if (pQInfo->code < 0) { // less than 0 means there are error existed. - return -pQInfo->code; - } + + return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { @@ -6250,6 +6230,11 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); + + setQueryStatus(pQuery, QUERY_COMPLETED); + return TSDB_CODE_SUCCESS; + + // todo if interpolation exists, the result may be dump to client by several rounds } static void addToTaskQueue(SQInfo* pQInfo) { @@ -6261,11 +6246,7 @@ static void addToTaskQueue(SQInfo* pQInfo) { dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); #endif - if (pQInfo->killed == 1) { - dTrace("%p freed or killed, abort query", pQInfo); - } else { - // todo add to task queue - } + // todo add to task queue } } @@ -6293,12 +6274,20 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c } if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { - doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); + code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); + + // has more data to return or need next round to execute addToTaskQueue(pQInfo); - return TSDB_CODE_SUCCESS; + } else if (isQueryKilled(pQuery)) { + code = TSDB_CODE_QUERY_CANCELLED; } - assert(code != TSDB_CODE_ACTION_IN_PROGRESS); + if (isQueryKilled(pQuery) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + (*pRsp)->completed = 1; // notify no more result to client + vnodeFreeQInfo(pQInfo); + } + + return code; // if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { // dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); diff --git a/src/util/src/shash.c b/src/util/src/shash.c index 525d00e81e..da97af84bb 100644 --- a/src/util/src/shash.c +++ b/src/util/src/shash.c @@ -162,8 +162,6 @@ void taosDeleteStrHash(void *handle, char *string) { if (pObj == NULL || pObj->maxSessions == 0) return; if (string == NULL || string[0] == 0) return; - return; - hash = (*(pObj->hashFp))(pObj, string); pthread_mutex_lock(&pObj->mutex); From b4e4e5a50b612a95211b9da4c78dcacd1bd550a5 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 24 Mar 2020 16:18:43 +0800 Subject: [PATCH 3/5] [td-32] fix bugs and refactor codes --- src/client/src/tscSql.c | 62 +++-------------------------------------- 1 file changed, 4 insertions(+), 58 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index fa50551548..4885cf7cc3 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -653,62 +653,6 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { return pRes->tsrow; } -TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { - SSqlObj *pSql = (SSqlObj *)res; - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { - return NULL; - } - - if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { - tscFetchDatablockFromSubquery(pSql); - - if (pRes->code == TSDB_CODE_SUCCESS) { - tscTrace("%p data from all subqueries have been retrieved to client", pSql); - return tscBuildResFromSubqueries(pSql); - } else { - tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code); - return NULL; - } - - } else if (pRes->row >= pRes->numOfRows) { - /** - * NOT a join query - * - * If the data block of current result set have been consumed already, try fetch next result - * data block from virtual node. - */ - tscResetForNextRetrieve(pRes); - - if (pCmd->command < TSDB_SQL_LOCAL) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - } - - tscProcessSql(pSql); // retrieve data from virtual node - - // if failed to retrieve data from current virtual node, try next one if exists - if (hasMoreVnodesToTry(pSql)) { - tscTryQueryNextVnode(pSql, NULL); - } - - /* - * local reducer has handle this case, - * so no need to add the pRes->numOfRows for super table query - */ - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { - pRes->numOfTotalInCurrentClause += pRes->numOfRows; - } - - if (pRes->numOfRows == 0) { - return NULL; - } - } - - return doSetResultRowData(pSql); -} - static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; if (numOfRows < 0) { @@ -729,8 +673,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT || - pRes->completed) { + if (pRes->qhandle == 0 || + pRes->completed || + pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || + pCmd->command == TSDB_SQL_INSERT) { return NULL; } From e0863b4c0b8428aaa894a0004c1cd6ae6ee633e7 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 24 Mar 2020 17:48:29 +0800 Subject: [PATCH 4/5] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 59 +++++++-- src/vnode/tsdb/src/tsdbFile.c | 236 +++++++++++++++++----------------- src/vnode/tsdb/src/tsdbMain.c | 66 +++++----- 3 files changed, 201 insertions(+), 160 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 385d1f59ea..9a4d94c58f 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -34,15 +34,22 @@ typedef enum { TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; +#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) + extern const char *tsdbFileSuffix[]; typedef struct { int8_t type; + int fd; char fname[128]; int64_t size; // total size of the file int64_t tombSize; // unused file size + int32_t totalBlocks; + int32_t totalSubBlocks; } SFile; +#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1) + typedef struct { int32_t fileId; SFile files[TSDB_FILE_TYPE_MAX]; @@ -50,14 +57,26 @@ typedef struct { // TSDB file handle typedef struct { - int32_t daysPerFile; - int32_t keep; - int32_t minRowPerFBlock; - int32_t maxRowsPerFBlock; - int32_t maxTables; + int maxFGroups; + int numOfFGroups; + SFileGroup fGroup[]; } STsdbFileH; +#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId +#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId + +STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); +void tsdbCloseFileH(STsdbFileH *pFileH); +int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); + +typedef struct { + int32_t len; + int32_t padding; // For padding purpose + int64_t offset; +} SCompIdx; + /** * if numOfSubBlocks == -1, then the SCompBlock is a sub-block * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to @@ -78,14 +97,32 @@ typedef struct { TSKEY keyLast; } SCompBlock; -#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) +typedef struct { + int32_t delimiter; // For recovery usage + int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int64_t uid; + int32_t padding; // For padding purpose + int32_t numOfBlocks; // TODO: make the struct padding + SCompBlock blocks[]; +} SCompInfo; -STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock, int32_t maxTables); +// TODO: take pre-calculation into account +typedef struct { + int16_t colId; // Column ID + int16_t len; // Column length + int32_t type : 8; + int32_t offset : 24; +} SCompCol; -void tsdbCloseFile(STsdbFileH *pFileH); -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); -void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); +// TODO: Take recover into account +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + int64_t uid; // For recovery usage + SCompCol cols[]; +} SCompData; + +void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 98562be0cc..f622c38b5f 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -27,72 +27,126 @@ #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F -typedef struct { - int32_t len; - int32_t padding; // For padding purpose - int64_t offset; -} SCompIdx; - -typedef struct { - int32_t delimiter; // For recovery usage - int32_t checksum; // TODO: decide if checksum logic in this file or make it one API - int64_t uid; - int32_t padding; // For padding purpose - int32_t numOfBlocks; // TODO: make the struct padding - SCompBlock blocks[]; -} SCompInfo; - -// TODO: take pre-calculation into account -typedef struct { - int16_t colId; // Column ID - int16_t len; // Column length - int32_t type : 8; - int32_t offset : 24; -} SCompCol; - -// TODO: Take recover into account -typedef struct { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - int64_t uid; // For recovery usage - SCompCol cols[]; -} SCompData; - const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA ".last" // TSDB_FILE_TYPE_LAST }; -static int tsdbWriteFileHead(int fd, SFile *pFile) { +static int compFGroupKey(const void *key, const void *fgroup); +static int compFGroup(const void *arg1, const void *arg2); +static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); +static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); +static int tsdbWriteFileHead(SFile *pFile); +static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); + +STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { + STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); + if (pFileH == NULL) { // TODO: deal with ERROR here + return NULL; + } + + pFileH->maxFGroups = maxFiles; + + DIR *dir = opendir(dataDir); + if (dir == NULL) { + free(pFileH); + return NULL; + } + + struct dirent *dp; + while ((dp = readdir(dir)) != NULL) { + if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; + // TODO + } + + return pFileH; +} + +void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } + +int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { + if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; + + SFileGroup fGroup; + SFileGroup *pFGroup = &fGroup; + if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) || + bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) == + NULL) { + pFGroup->fileId = fid; + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { + // TODO: deal with the ERROR here, remove those creaed file + return -1; + } + } + + pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; + qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + } + return 0; +} + +int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { + SFileGroup *pGroup = + bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); + if (pGroup == NULL) return -1; + + // Remove from disk + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + remove(pGroup->files[type].fname); + } + + // Adjust the memory + int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1); + if (filesBehind > 0) { + memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind); + } + pFileH->numOfFGroups--; + + return 0; +} + +static int compFGroupKey(const void *key, const void *fgroup) { + int fid = *(int *)key; + SFileGroup *pFGroup = (SFileGroup *)fgroup; + return (fid - pFGroup->fileId); +} + +static int compFGroup(const void *arg1, const void *arg2) { + return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; +} + +static int tsdbWriteFileHead(SFile *pFile) { char head[TSDB_FILE_HEAD_SIZE] = "\0"; pFile->size += TSDB_FILE_HEAD_SIZE; // TODO: write version and File statistic to the head - lseek(fd, 0, SEEK_SET); - if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; + lseek(pFile->fd, 0, SEEK_SET); + if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; return 0; } -static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) { +static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { int size = sizeof(SCompIdx) * maxTables; void *buf = calloc(1, size); if (buf == NULL) return -1; - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { free(buf); return -1; } - if (write(fd, buf, size) < 0) { + if (write(pFile->fd, buf, size) < 0) { free(buf); return -1; } pFile->size += size; + free(buf); return 0; } @@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) return 0; } -/** - * Create a file and set the SFile object - */ +static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function + if (TSDB_IS_FILE_OPENED(pFile)) return -1; + + pFile->fd = open(pFile->fname, oflag, 0755); + if (pFile->fd < 0) return -1; + + return 0; +} + +static int tsdbCloseFile(SFile *pFile) { + if (!TSDB_IS_FILE_OPENED(pFile)) return -1; + int ret = close(pFile->fd); + pFile->fd = -1; + + return ret; +} + static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { memset((void *)pFile, 0, sizeof(SFile)); pFile->type = type; + pFile->fd = -1; tsdbGetFileName(dataDir, fileId, type, pFile->fname); if (access(pFile->fname, F_OK) == 0) { @@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755); - if (fd < 0) return -1; - - if (type == TSDB_FILE_TYPE_HEAD) { - if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) { - close(fd); - return -1; - } - } - - if (tsdbWriteFileHead(fd, pFile) < 0) { - close(fd); + if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) { + // TODO: deal with the ERROR here return -1; } - close(fd); - - return 0; -} - -static int tsdbRemoveFile(SFile *pFile) { - if (pFile == NULL) return -1; - return remove(pFile->fname); -} - -// Create a file group with fileId and return a SFileGroup object -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) { - if (dataDir == NULL || pFGroup == NULL) return -1; - - memset((void *)pFGroup, 0, sizeof(SFileGroup)); - - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) { - // TODO: deal with the error here, remove the created files + if (type == TSDB_FILE_TYPE_HEAD) { + if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) { + tsdbCloseFile(pFile); return -1; } } - pFGroup->fileId = fileId; + if (tsdbWriteFileHead(pFile) < 0) { + tsdbCloseFile(pFile); + return -1; + } + + tsdbCloseFile(pFile); return 0; } -/** - * Initialize the TSDB file handle - */ -STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock, int32_t maxTables) { - STsdbFileH *pTsdbFileH = - (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile)); - if (pTsdbFileH == NULL) return NULL; - - pTsdbFileH->daysPerFile = daysPerFile; - pTsdbFileH->keep = keep; - pTsdbFileH->minRowPerFBlock = minRowsPerFBlock; - pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock; - pTsdbFileH->maxTables = maxTables; - - // Open the directory to read information of each file - DIR *dir = opendir(dataDir); - if (dir == NULL) { - free(pTsdbFileH); - return NULL; - } - - char fname[256]; - - struct dirent *dp; - while ((dp = readdir(dir)) != NULL) { - if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; - if (true /* check if the file is the .head file */) { - int fileId = 0; - int vgId = 0; - sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId); - // TODO - - // Open head file - - // Open data file - - // Open last file - } - } - - return pTsdbFileH; -} - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index afd57947b7..07ea9bd11a 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO char dataDir[128] = "\0"; tsdbGetDataDirName(pRepo, dataDir); pRepo->tsdbFileH = - tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables); + tsdbInitFileH(dataDir, pCfg->maxTables); if (pRepo->tsdbFileH == NULL) { free(pRepo->rootDir); tsdbFreeCache(pRepo->tsdbCache); @@ -787,7 +787,7 @@ static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { for (int tid = 0; tid < maxTables; tid++) { if (iters[tid] == NULL) continue; - tSkipListDestroy(iters[tid]); + tSkipListDestroyIter(iters[tid]); } free(iters); @@ -836,42 +836,42 @@ static void *tsdbCommitToFile(void *arg) { SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); - int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + // int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); + // int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); - for (int fid = sfid; fid <= efid; fid++) { - TSKEY minKey = 0, maxKey = 0; - tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + // for (int fid = sfid; fid <= efid; fid++) { + // TSKEY minKey = 0, maxKey = 0; + // tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - // tsdbOpenFileForWrite(pRepo, fid); + // // tsdbOpenFileForWrite(pRepo, fid); - for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL || pTable->imem == NULL) continue; - if (iters[tid] == NULL) { // create table iterator - iters[tid] = tSkipListCreateIter(pTable->imem->pData); - // TODO: deal with the error - if (iters[tid] == NULL) break; - if (!tSkipListIterNext(iters[tid])) { - // assert(0); - } - } + // for (int tid = 0; tid < pCfg->maxTables; tid++) { + // STable *pTable = pMeta->tables[tid]; + // if (pTable == NULL || pTable->imem == NULL) continue; + // if (iters[tid] == NULL) { // create table iterator + // iters[tid] = tSkipListCreateIter(pTable->imem->pData); + // // TODO: deal with the error + // if (iters[tid] == NULL) break; + // if (!tSkipListIterNext(iters[tid])) { + // // assert(0); + // } + // } - // Init row data part - cols[0] = (SDataCol *)buf; - for (int col = 1; col < schemaNCols(pTable->schema); col++) { - cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); - } + // // Init row data part + // cols[0] = (SDataCol *)buf; + // for (int col = 1; col < schemaNCols(pTable->schema); col++) { + // cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); + // } - // Loop the iterator - int rowsRead = 0; - while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > - 0) { - // printf("rowsRead:%d-----------\n", rowsRead); - int k = 0; - } - } - } + // // Loop the iterator + // int rowsRead = 0; + // while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > + // 0) { + // // printf("rowsRead:%d-----------\n", rowsRead); + // int k = 0; + // } + // } + // } tsdbDestroyTableIters(iters, pCfg->maxTables); From 377517eec46cc5f3d12f6f38fdf3e589ccf67640 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 24 Mar 2020 18:30:25 +0800 Subject: [PATCH 5/5] TD-34 --- src/vnode/tsdb/src/tsdbMain.c | 63 +++++++++++++++--------------- src/vnode/tsdb/tests/tsdbTests.cpp | 2 +- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 07ea9bd11a..af3a923d90 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -818,6 +818,7 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) // Commit to file static void *tsdbCommitToFile(void *arg) { // TODO + printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; @@ -836,42 +837,42 @@ static void *tsdbCommitToFile(void *arg) { SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); - // int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - // int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); + int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); - // for (int fid = sfid; fid <= efid; fid++) { - // TSKEY minKey = 0, maxKey = 0; - // tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + for (int fid = sfid; fid <= efid; fid++) { + TSKEY minKey = 0, maxKey = 0; + tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - // // tsdbOpenFileForWrite(pRepo, fid); + // tsdbOpenFileForWrite(pRepo, fid); - // for (int tid = 0; tid < pCfg->maxTables; tid++) { - // STable *pTable = pMeta->tables[tid]; - // if (pTable == NULL || pTable->imem == NULL) continue; - // if (iters[tid] == NULL) { // create table iterator - // iters[tid] = tSkipListCreateIter(pTable->imem->pData); - // // TODO: deal with the error - // if (iters[tid] == NULL) break; - // if (!tSkipListIterNext(iters[tid])) { - // // assert(0); - // } - // } + for (int tid = 0; tid < pCfg->maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + if (pTable == NULL || pTable->imem == NULL) continue; + if (iters[tid] == NULL) { // create table iterator + iters[tid] = tSkipListCreateIter(pTable->imem->pData); + // TODO: deal with the error + if (iters[tid] == NULL) break; + if (!tSkipListIterNext(iters[tid])) { + // assert(0); + } + } - // // Init row data part - // cols[0] = (SDataCol *)buf; - // for (int col = 1; col < schemaNCols(pTable->schema); col++) { - // cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); - // } + // Init row data part + cols[0] = (SDataCol *)buf; + for (int col = 1; col < schemaNCols(pTable->schema); col++) { + cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock); + } - // // Loop the iterator - // int rowsRead = 0; - // while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > - // 0) { - // // printf("rowsRead:%d-----------\n", rowsRead); - // int k = 0; - // } - // } - // } + // Loop the iterator + int rowsRead = 0; + while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) > + 0) { + // printf("rowsRead:%d-----------\n", rowsRead); + int k = 0; + } + } + } tsdbDestroyTableIters(iters, pCfg->maxTables); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index a76aef2d41..bc6532984f 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -142,7 +142,7 @@ TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, DISABLED_createFileGroup) { SFileGroup fGroup; - ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); + // ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); int k = 0; } \ No newline at end of file