Merge branch '3.0' into feature/TD-11463-3.0
This commit is contained in:
commit
e2e1d4c899
|
@ -131,69 +131,91 @@
|
|||
#define TK_FUNCTIONS 113
|
||||
#define TK_INDEXES 114
|
||||
#define TK_FROM 115
|
||||
#define TK_LIKE 116
|
||||
#define TK_INDEX 117
|
||||
#define TK_FULLTEXT 118
|
||||
#define TK_FUNCTION 119
|
||||
#define TK_INTERVAL 120
|
||||
#define TK_TOPIC 121
|
||||
#define TK_AS 122
|
||||
#define TK_DESC 123
|
||||
#define TK_DESCRIBE 124
|
||||
#define TK_RESET 125
|
||||
#define TK_QUERY 126
|
||||
#define TK_EXPLAIN 127
|
||||
#define TK_ANALYZE 128
|
||||
#define TK_VERBOSE 129
|
||||
#define TK_NK_BOOL 130
|
||||
#define TK_RATIO 131
|
||||
#define TK_NULL 132
|
||||
#define TK_NK_VARIABLE 133
|
||||
#define TK_NK_UNDERLINE 134
|
||||
#define TK_ROWTS 135
|
||||
#define TK_TBNAME 136
|
||||
#define TK_QSTARTTS 137
|
||||
#define TK_QENDTS 138
|
||||
#define TK_WSTARTTS 139
|
||||
#define TK_WENDTS 140
|
||||
#define TK_WDURATION 141
|
||||
#define TK_BETWEEN 142
|
||||
#define TK_IS 143
|
||||
#define TK_NK_LT 144
|
||||
#define TK_NK_GT 145
|
||||
#define TK_NK_LE 146
|
||||
#define TK_NK_GE 147
|
||||
#define TK_NK_NE 148
|
||||
#define TK_MATCH 149
|
||||
#define TK_NMATCH 150
|
||||
#define TK_IN 151
|
||||
#define TK_JOIN 152
|
||||
#define TK_INNER 153
|
||||
#define TK_SELECT 154
|
||||
#define TK_DISTINCT 155
|
||||
#define TK_WHERE 156
|
||||
#define TK_PARTITION 157
|
||||
#define TK_BY 158
|
||||
#define TK_SESSION 159
|
||||
#define TK_STATE_WINDOW 160
|
||||
#define TK_SLIDING 161
|
||||
#define TK_FILL 162
|
||||
#define TK_VALUE 163
|
||||
#define TK_NONE 164
|
||||
#define TK_PREV 165
|
||||
#define TK_LINEAR 166
|
||||
#define TK_NEXT 167
|
||||
#define TK_GROUP 168
|
||||
#define TK_HAVING 169
|
||||
#define TK_ORDER 170
|
||||
#define TK_SLIMIT 171
|
||||
#define TK_SOFFSET 172
|
||||
#define TK_LIMIT 173
|
||||
#define TK_OFFSET 174
|
||||
#define TK_ASC 175
|
||||
#define TK_NULLS 176
|
||||
#define TK_FIRST 177
|
||||
#define TK_LAST 178
|
||||
#define TK_ACCOUNTS 116
|
||||
#define TK_APPS 117
|
||||
#define TK_CONNECTIONS 118
|
||||
#define TK_LICENCE 119
|
||||
#define TK_QUERIES 120
|
||||
#define TK_SCORES 121
|
||||
#define TK_TOPICS 122
|
||||
#define TK_VARIABLES 123
|
||||
#define TK_LIKE 124
|
||||
#define TK_INDEX 125
|
||||
#define TK_FULLTEXT 126
|
||||
#define TK_FUNCTION 127
|
||||
#define TK_INTERVAL 128
|
||||
#define TK_TOPIC 129
|
||||
#define TK_AS 130
|
||||
#define TK_DESC 131
|
||||
#define TK_DESCRIBE 132
|
||||
#define TK_RESET 133
|
||||
#define TK_QUERY 134
|
||||
#define TK_EXPLAIN 135
|
||||
#define TK_ANALYZE 136
|
||||
#define TK_VERBOSE 137
|
||||
#define TK_NK_BOOL 138
|
||||
#define TK_RATIO 139
|
||||
#define TK_COMPACT 140
|
||||
#define TK_VNODES 141
|
||||
#define TK_IN 142
|
||||
#define TK_OUTPUTTYPE 143
|
||||
#define TK_AGGREGATE 144
|
||||
#define TK_BUFSIZE 145
|
||||
#define TK_STREAM 146
|
||||
#define TK_INTO 147
|
||||
#define TK_KILL 148
|
||||
#define TK_CONNECTION 149
|
||||
#define TK_MERGE 150
|
||||
#define TK_VGROUP 151
|
||||
#define TK_REDISTRIBUTE 152
|
||||
#define TK_SPLIT 153
|
||||
#define TK_SYNCDB 154
|
||||
#define TK_NULL 155
|
||||
#define TK_NK_VARIABLE 156
|
||||
#define TK_NK_UNDERLINE 157
|
||||
#define TK_ROWTS 158
|
||||
#define TK_TBNAME 159
|
||||
#define TK_QSTARTTS 160
|
||||
#define TK_QENDTS 161
|
||||
#define TK_WSTARTTS 162
|
||||
#define TK_WENDTS 163
|
||||
#define TK_WDURATION 164
|
||||
#define TK_BETWEEN 165
|
||||
#define TK_IS 166
|
||||
#define TK_NK_LT 167
|
||||
#define TK_NK_GT 168
|
||||
#define TK_NK_LE 169
|
||||
#define TK_NK_GE 170
|
||||
#define TK_NK_NE 171
|
||||
#define TK_MATCH 172
|
||||
#define TK_NMATCH 173
|
||||
#define TK_JOIN 174
|
||||
#define TK_INNER 175
|
||||
#define TK_SELECT 176
|
||||
#define TK_DISTINCT 177
|
||||
#define TK_WHERE 178
|
||||
#define TK_PARTITION 179
|
||||
#define TK_BY 180
|
||||
#define TK_SESSION 181
|
||||
#define TK_STATE_WINDOW 182
|
||||
#define TK_SLIDING 183
|
||||
#define TK_FILL 184
|
||||
#define TK_VALUE 185
|
||||
#define TK_NONE 186
|
||||
#define TK_PREV 187
|
||||
#define TK_LINEAR 188
|
||||
#define TK_NEXT 189
|
||||
#define TK_GROUP 190
|
||||
#define TK_HAVING 191
|
||||
#define TK_ORDER 192
|
||||
#define TK_SLIMIT 193
|
||||
#define TK_SOFFSET 194
|
||||
#define TK_LIMIT 195
|
||||
#define TK_OFFSET 196
|
||||
#define TK_ASC 197
|
||||
#define TK_NULLS 198
|
||||
#define TK_FIRST 199
|
||||
#define TK_LAST 200
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
@ -207,10 +229,9 @@
|
|||
#define TK_NK_COLON 500
|
||||
#define TK_NK_BITNOT 501
|
||||
#define TK_INSERT 502
|
||||
#define TK_INTO 503
|
||||
#define TK_NOW 504
|
||||
#define TK_VALUES 507
|
||||
#define TK_IMPORT 507
|
||||
#define TK_IMPORT 509
|
||||
#define TK_NK_SEMI 508
|
||||
|
||||
#define TK_NK_NIL 65535
|
||||
|
|
|
@ -197,6 +197,12 @@ typedef struct SShowStmt {
|
|||
SNode* pTbNamePattern; // SValueNode
|
||||
} SShowStmt;
|
||||
|
||||
typedef struct SShowCreatStmt {
|
||||
ENodeType type;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
} SShowCreatStmt;
|
||||
|
||||
typedef enum EIndexType {
|
||||
INDEX_TYPE_SMA = 1,
|
||||
INDEX_TYPE_FULLTEXT
|
||||
|
|
|
@ -103,6 +103,15 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_EXPLAIN_STMT,
|
||||
QUERY_NODE_DESCRIBE_STMT,
|
||||
QUERY_NODE_RESET_QUERY_CACHE_STMT,
|
||||
QUERY_NODE_COMPACT_STMT,
|
||||
QUERY_NODE_CREATE_FUNCTION_STMT,
|
||||
QUERY_NODE_DROP_FUNCTION_STMT,
|
||||
QUERY_NODE_CREATE_STREAM_STMT,
|
||||
QUERY_NODE_DROP_STREAM_STMT,
|
||||
QUERY_NODE_MERGE_VGROUP_STMT,
|
||||
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT,
|
||||
QUERY_NODE_SPLIT_VGROUP_STMT,
|
||||
QUERY_NODE_SYNCDB_STMT,
|
||||
QUERY_NODE_SHOW_DATABASES_STMT,
|
||||
QUERY_NODE_SHOW_TABLES_STMT,
|
||||
QUERY_NODE_SHOW_STABLES_STMT,
|
||||
|
@ -115,6 +124,18 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_SHOW_FUNCTIONS_STMT,
|
||||
QUERY_NODE_SHOW_INDEXES_STMT,
|
||||
QUERY_NODE_SHOW_STREAMS_STMT,
|
||||
QUERY_NODE_SHOW_APPS_STMT,
|
||||
QUERY_NODE_SHOW_CONNECTIONS_STMT,
|
||||
QUERY_NODE_SHOW_LICENCE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
|
||||
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
|
||||
QUERY_NODE_SHOW_QUERIES_STMT,
|
||||
QUERY_NODE_SHOW_SCORES_STMT,
|
||||
QUERY_NODE_SHOW_TOPICS_STMT,
|
||||
QUERY_NODE_SHOW_VARIABLE_STMT,
|
||||
QUERY_NODE_KILL_CONNECTION_STMT,
|
||||
QUERY_NODE_KILL_QUERY_STMT,
|
||||
|
||||
// logic plan node
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN,
|
||||
|
|
|
@ -32,6 +32,7 @@ extern "C" {
|
|||
void *taosMemoryMalloc(int32_t size);
|
||||
void *taosMemoryCalloc(int32_t num, int32_t size);
|
||||
void *taosMemoryRealloc(void *ptr, int32_t size);
|
||||
void *taosMemoryStrDup(void *ptr);
|
||||
void taosMemoryFree(const void *ptr);
|
||||
int32_t taosMemorySize(void *ptr);
|
||||
|
||||
|
|
|
@ -187,7 +187,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
|
|||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -362,6 +362,7 @@ typedef struct SSourceDataInfo {
|
|||
int32_t index;
|
||||
SRetrieveTableRsp *pRsp;
|
||||
uint64_t totalRows;
|
||||
int32_t code;
|
||||
EX_SOURCE_STATUS status;
|
||||
} SSourceDataInfo;
|
||||
|
||||
|
|
|
@ -844,8 +844,7 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowI
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||
static void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||
|
||||
static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t id, STimeWindow* win, bool masterscan,
|
||||
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
|
@ -863,7 +862,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_
|
|||
// set time window for current result
|
||||
pResultRow->win = (*win);
|
||||
*pResult = pResultRow;
|
||||
setResultRowOutputBufInitCtx_rv(pAggSup->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1046,7 +1045,9 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) {
|
|||
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function
|
||||
updateTimeWindowInfo(pTimeWindowData, pWin);
|
||||
if (pTimeWindowData != NULL) {
|
||||
updateTimeWindowInfo(pTimeWindowData, pWin);
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].startTs = pWin->skey;
|
||||
|
@ -1894,9 +1895,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||
int32_t ret =
|
||||
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
@ -2018,12 +2017,11 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCo
|
|||
SqlFunctionCtx* pCtx = binfo->pCtx;
|
||||
|
||||
SResultRow* pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId,
|
||||
pTaskInfo, true, pAggSup);
|
||||
pTaskInfo, false, pAggSup);
|
||||
assert(pResultRow != NULL);
|
||||
|
||||
setResultRowKey(pResultRow, pData, type);
|
||||
|
||||
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
|
||||
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2178,8 +2176,8 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
|
|||
}
|
||||
}
|
||||
pCtx->resDataInfo.interBufSize = env.calcMemSize;
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR) {
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR) {
|
||||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element.
|
||||
}
|
||||
|
||||
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
||||
|
@ -3775,8 +3773,7 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes
|
|||
}
|
||||
}
|
||||
|
||||
void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
|
||||
|
||||
|
@ -3789,18 +3786,13 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S
|
|||
continue;
|
||||
}
|
||||
|
||||
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
||||
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
|
||||
// }
|
||||
|
||||
if (!pResInfo->initialized && pCtx[i].functionId != -1) {
|
||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) {
|
||||
// for simple group by query without interval, all the tables belong to one group result.
|
||||
int64_t uid = 0;
|
||||
int64_t tid = 0;
|
||||
|
@ -3819,14 +3811,13 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
|
|||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||
*/
|
||||
if (pResultRow->pageId == -1) {
|
||||
int32_t ret =
|
||||
addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize);
|
||||
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
setResultRowOutputBufInitCtx_rv(pAggInfo->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
}
|
||||
|
||||
void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo,
|
||||
|
@ -5013,12 +5004,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
|||
|
||||
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
|
||||
pSourceDataInfo->pRsp = pMsg->pData;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pSourceDataInfo->pRsp = pMsg->pData;
|
||||
|
||||
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
||||
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
||||
pRsp->useconds = htobe64(pRsp->useconds);
|
||||
pRsp->compLen = htonl(pRsp->compLen);
|
||||
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
||||
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
||||
pRsp->compLen = htonl(pRsp->compLen);
|
||||
pRsp->useconds = htobe64(pRsp->useconds);
|
||||
} else {
|
||||
pSourceDataInfo->code = code;
|
||||
}
|
||||
|
||||
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
||||
tsem_post(&pSourceDataInfo->pEx->ready);
|
||||
|
@ -5274,7 +5269,6 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
totalSources, endTs - startTs);
|
||||
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
|
||||
}
|
||||
|
@ -5318,18 +5312,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
|
||||
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
|
||||
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, tstrerror(pDataInfo->code));
|
||||
pOperator->pTaskInfo->code = pDataInfo->code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
||||
" try next",
|
||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||
|
||||
|
|
|
@ -150,6 +150,8 @@ SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int
|
|||
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const SToken* pTagName, SNode* pVal);
|
||||
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, SNode* pTbNamePattern);
|
||||
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName);
|
||||
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable);
|
||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword);
|
||||
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
|
||||
SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
|
||||
|
@ -170,6 +172,16 @@ SNode* setExplainRatio(SAstCreateContext* pCxt, SNode* pOptions, const SToken* p
|
|||
SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions, SNode* pQuery);
|
||||
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
||||
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups);
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName, const SToken* pTableName, SNode* pQuery);
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName);
|
||||
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
|
||||
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
|
||||
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
|
||||
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId);
|
||||
SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -78,8 +78,6 @@ typedef struct STableDataBlocks {
|
|||
char *pData;
|
||||
bool cloned;
|
||||
STagData tagData;
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
|
||||
SParsedDataColInfo boundColumnInfo;
|
||||
SRowBuilder rowBuilder;
|
||||
|
|
|
@ -301,6 +301,17 @@ cmd ::= SHOW QNODES.
|
|||
cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, A, B); }
|
||||
cmd ::= SHOW STREAMS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_STREAMS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW ACCOUNTS. { pCxt->valid = false; generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
|
||||
cmd ::= SHOW APPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_APPS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW CONNECTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONNECTIONS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW LICENCE. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LICENCE_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW CREATE DATABASE db_name(A). { pCxt->pRootNode = createShowCreateDatabaseStmt(pCxt, &A); }
|
||||
cmd ::= SHOW CREATE TABLE full_table_name(A). { pCxt->pRootNode = createShowCreateTableStmt(pCxt, QUERY_NODE_SHOW_CREATE_TABLE_STMT, A); }
|
||||
cmd ::= SHOW CREATE STABLE full_table_name(A). { pCxt->pRootNode = createShowCreateTableStmt(pCxt, QUERY_NODE_SHOW_CREATE_STABLE_STMT, A); }
|
||||
cmd ::= SHOW QUERIES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QUERIES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT, NULL, NULL); }
|
||||
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
|
||||
|
||||
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
|
||||
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
|
||||
|
@ -364,6 +375,45 @@ explain_options(A) ::= .
|
|||
explain_options(A) ::= explain_options(B) VERBOSE NK_BOOL(C). { A = setExplainVerbose(pCxt, B, &C); }
|
||||
explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C). { A = setExplainRatio(pCxt, B, &C); }
|
||||
|
||||
/************************************************ compact *************************************************************/
|
||||
cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); }
|
||||
|
||||
/************************************************ create/drop function ************************************************/
|
||||
cmd ::= CREATE agg_func_opt(A) FUNCTION function_name(B)
|
||||
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, A, &B, &C, D, E); }
|
||||
cmd ::= DROP FUNCTION function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, &A); }
|
||||
|
||||
%type agg_func_opt { bool }
|
||||
%destructor agg_func_opt { }
|
||||
agg_func_opt(A) ::= . { A = false; }
|
||||
agg_func_opt(A) ::= AGGREGATE. { A = true; }
|
||||
|
||||
%type bufsize_opt { int32_t }
|
||||
%destructor bufsize_opt { }
|
||||
bufsize_opt(A) ::= . { A = 0; }
|
||||
bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). { A = strtol(B.z, NULL, 10); }
|
||||
|
||||
/************************************************ create/drop stream **************************************************/
|
||||
cmd ::= CREATE STREAM stream_name(A) INTO table_name(B) AS query_expression(C). { pCxt->pRootNode = createCreateStreamStmt(pCxt, &A, &B, C); }
|
||||
cmd ::= DROP STREAM stream_name(A). { pCxt->pRootNode = createDropStreamStmt(pCxt, &A); }
|
||||
|
||||
/************************************************ kill connection/query ***********************************************/
|
||||
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
|
||||
cmd ::= KILL QUERY NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_QUERY_STMT, &A); }
|
||||
|
||||
/************************************************ merge/redistribute/ vgroup ******************************************/
|
||||
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
|
||||
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
|
||||
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
|
||||
|
||||
%type dnode_list { SNodeList* }
|
||||
%destructor dnode_list { nodesDestroyList($$); }
|
||||
dnode_list(A) ::= DNODE NK_INTEGER(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); }
|
||||
dnode_list(A) ::= dnode_list(B) DNODE NK_INTEGER(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C)); }
|
||||
|
||||
/************************************************ syncdb **************************************************************/
|
||||
cmd ::= SYNCDB db_name(A) REPLICA. { pCxt->pRootNode = createSyncdbStmt(pCxt, &A); }
|
||||
|
||||
/************************************************ select **************************************************************/
|
||||
cmd ::= query_expression(A). { pCxt->pRootNode = A; }
|
||||
|
||||
|
@ -442,6 +492,10 @@ index_name(A) ::= NK_ID(B).
|
|||
%destructor topic_name { }
|
||||
topic_name(A) ::= NK_ID(B). { A = B; }
|
||||
|
||||
%type stream_name { SToken }
|
||||
%destructor stream_name { }
|
||||
stream_name(A) ::= NK_ID(B). { A = B; }
|
||||
|
||||
/************************************************ expression **********************************************************/
|
||||
expression(A) ::= literal(B). { A = B; }
|
||||
//expression(A) ::= NK_QUESTION(B). { A = B; }
|
||||
|
|
|
@ -1221,6 +1221,18 @@ SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pDbName, S
|
|||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_SHOW_CREATE_DATABASE_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable) {
|
||||
SNode* pStmt = nodesMakeNode(type);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword) {
|
||||
char password[TSDB_USET_PASSWORD_LEN] = {0};
|
||||
if (!checkUserName(pCxt, pUserName) || !checkPassword(pCxt, pPassword, password)) {
|
||||
|
@ -1433,3 +1445,63 @@ SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt) {
|
|||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_COMPACT_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_DROP_FUNCTION_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName, const SToken* pTableName, SNode* pQuery) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, const SToken* pStreamName) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_DROP_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
|
||||
SNode* pStmt = nodesMakeNode(type);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_REDISTRIBUTE_VGROUP_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_SPLIT_VGROUP_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_SYNCDB_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
|
|
@ -52,8 +52,6 @@ typedef struct SInsertParseContext {
|
|||
SParseContext* pComCxt; // input
|
||||
char *pSql; // input
|
||||
SMsgBuf msg; // input
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
STableMeta* pTableMeta; // each table
|
||||
SParsedDataColInfo tags; // each table
|
||||
SKVRowBuilder tagsBuilder; // each table
|
||||
|
@ -231,9 +229,6 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
|||
SVgroupInfo vg;
|
||||
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
pCxt->pTableMeta->vgId = vg.vgId; // todo remove
|
||||
strcpy(pCxt->tableName, name.tname);
|
||||
tNameGetFullDbName(&name, pCxt->dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -977,8 +972,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
STableDataBlocks *dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
|
||||
strcpy(dataBuf->tableName, pCxt->tableName);
|
||||
strcpy(dataBuf->dbFName, pCxt->dbFName);
|
||||
|
||||
if (TK_NK_LP == sToken.type) {
|
||||
// pSql -> field1_name, ...)
|
||||
|
|
|
@ -29,10 +29,14 @@ typedef struct SKeyword {
|
|||
// keywords in sql string
|
||||
static SKeyword keywordTable[] = {
|
||||
{"ACCOUNT", TK_ACCOUNT},
|
||||
{"ACCOUNTS", TK_ACCOUNTS},
|
||||
{"ADD", TK_ADD},
|
||||
{"AGGREGATE", TK_AGGREGATE},
|
||||
{"ALL", TK_ALL},
|
||||
{"ALTER", TK_ALTER},
|
||||
{"ANALYZE", TK_ANALYZE},
|
||||
{"AND", TK_AND},
|
||||
{"APPS", TK_APPS},
|
||||
{"AS", TK_AS},
|
||||
{"ASC", TK_ASC},
|
||||
{"BETWEEN", TK_BETWEEN},
|
||||
|
@ -40,15 +44,22 @@ static SKeyword keywordTable[] = {
|
|||
{"BIGINT", TK_BIGINT},
|
||||
{"BLOCKS", TK_BLOCKS},
|
||||
{"BOOL", TK_BOOL},
|
||||
{"BUFSIZE", TK_BUFSIZE},
|
||||
{"BY", TK_BY},
|
||||
{"CACHE", TK_CACHE},
|
||||
{"CACHELAST", TK_CACHELAST},
|
||||
{"COLUMN", TK_COLUMN},
|
||||
{"COMMENT", TK_COMMENT},
|
||||
{"COMP", TK_COMP},
|
||||
{"COMPACT", TK_COMPACT},
|
||||
{"CONNS", TK_CONNS},
|
||||
{"CONNECTION", TK_CONNECTION},
|
||||
{"CONNECTIONS", TK_CONNECTIONS},
|
||||
{"CREATE", TK_CREATE},
|
||||
{"DATABASE", TK_DATABASE},
|
||||
{"DATABASES", TK_DATABASES},
|
||||
{"DAYS", TK_DAYS},
|
||||
{"DBS", TK_DBS},
|
||||
{"DELAY", TK_DELAY},
|
||||
{"DESC", TK_DESC},
|
||||
{"DESCRIBE", TK_DESCRIBE},
|
||||
|
@ -83,14 +94,18 @@ static SKeyword keywordTable[] = {
|
|||
{"JOIN", TK_JOIN},
|
||||
{"JSON", TK_JSON},
|
||||
{"KEEP", TK_KEEP},
|
||||
{"KILL", TK_KILL},
|
||||
{"LICENCE", TK_LICENCE},
|
||||
{"LIKE", TK_LIKE},
|
||||
{"LIMIT", TK_LIMIT},
|
||||
{"LINEAR", TK_LINEAR},
|
||||
{"LOCAL", TK_LOCAL},
|
||||
{"MATCH", TK_MATCH},
|
||||
{"MAXROWS", TK_MAXROWS},
|
||||
{"MINROWS", TK_MINROWS},
|
||||
{"MINUS", TK_MINUS},
|
||||
{"MNODES", TK_MNODES},
|
||||
{"MODIFY", TK_MODIFY},
|
||||
{"MODULES", TK_MODULES},
|
||||
{"NCHAR", TK_NCHAR},
|
||||
{"NMATCH", TK_NMATCH},
|
||||
|
@ -102,9 +117,11 @@ static SKeyword keywordTable[] = {
|
|||
{"ON", TK_ON},
|
||||
{"OR", TK_OR},
|
||||
{"ORDER", TK_ORDER},
|
||||
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
||||
{"PARTITION", TK_PARTITION},
|
||||
{"PASS", TK_PASS},
|
||||
{"PORT", TK_PORT},
|
||||
{"PPS", TK_PPS},
|
||||
{"PRECISION", TK_PRECISION},
|
||||
{"PRIVILEGE", TK_PRIVILEGE},
|
||||
{"PREV", TK_PREV},
|
||||
|
@ -112,6 +129,8 @@ static SKeyword keywordTable[] = {
|
|||
{"QNODE", TK_QNODE},
|
||||
{"QNODES", TK_QNODES},
|
||||
{"QSTARTTS", TK_QSTARTTS},
|
||||
{"QTIME", TK_QTIME},
|
||||
{"QUERIES", TK_QUERIES},
|
||||
{"QUERY", TK_QUERY},
|
||||
{"QUORUM", TK_QUORUM},
|
||||
{"RATIO", TK_RATIO},
|
||||
|
@ -120,8 +139,10 @@ static SKeyword keywordTable[] = {
|
|||
{"RETENTIONS", TK_RETENTIONS},
|
||||
{"ROLLUP", TK_ROLLUP},
|
||||
{"ROWTS", TK_ROWTS},
|
||||
{"SCORES", TK_SCORES},
|
||||
{"SELECT", TK_SELECT},
|
||||
{"SESSION", TK_SESSION},
|
||||
{"SET", TK_SET},
|
||||
{"SHOW", TK_SHOW},
|
||||
{"SINGLE_STABLE", TK_SINGLE_STABLE},
|
||||
{"SLIDING", TK_SLIDING},
|
||||
|
@ -131,16 +152,23 @@ static SKeyword keywordTable[] = {
|
|||
{"SOFFSET", TK_SOFFSET},
|
||||
{"STABLE", TK_STABLE},
|
||||
{"STABLES", TK_STABLES},
|
||||
{"STATE", TK_STATE},
|
||||
{"STATE_WINDOW", TK_STATE_WINDOW},
|
||||
{"STORAGE", TK_STORAGE},
|
||||
{"STREAM", TK_STREAM},
|
||||
{"STREAMS", TK_STREAMS},
|
||||
{"STREAM_MODE", TK_STREAM_MODE},
|
||||
{"SYNCDB", TK_SYNCDB},
|
||||
{"TABLE", TK_TABLE},
|
||||
{"TABLES", TK_TABLES},
|
||||
{"TAG", TK_TAG},
|
||||
{"TAGS", TK_TAGS},
|
||||
{"TBNAME", TK_TBNAME},
|
||||
{"TIMESTAMP", TK_TIMESTAMP},
|
||||
{"TINYINT", TK_TINYINT},
|
||||
{"TOPIC", TK_TOPIC},
|
||||
{"TOPICS", TK_TOPICS},
|
||||
{"TSERIES", TK_TSERIES},
|
||||
{"TTL", TK_TTL},
|
||||
{"UNION", TK_UNION},
|
||||
{"UNSIGNED", TK_UNSIGNED},
|
||||
|
@ -150,8 +178,10 @@ static SKeyword keywordTable[] = {
|
|||
{"USING", TK_USING},
|
||||
{"VALUES", TK_VALUES},
|
||||
{"VARCHAR", TK_VARCHAR},
|
||||
{"VARIABLES", TK_VARIABLES},
|
||||
{"VERBOSE", TK_VERBOSE},
|
||||
{"VGROUPS", TK_VGROUPS},
|
||||
{"VNODES", TK_VNODES},
|
||||
{"WAL", TK_WAL},
|
||||
{"WDURATION", TK_WDURATION},
|
||||
{"WENDTS", TK_WENDTS},
|
||||
|
@ -182,22 +212,8 @@ static SKeyword keywordTable[] = {
|
|||
// {"UMINUS", TK_UMINUS},
|
||||
// {"UPLUS", TK_UPLUS},
|
||||
// {"BITNOT", TK_BITNOT},
|
||||
// {"ACCOUNTS", TK_ACCOUNTS},
|
||||
// {"QUERIES", TK_QUERIES},
|
||||
// {"CONNECTIONS", TK_CONNECTIONS},
|
||||
// {"VARIABLES", TK_VARIABLES},
|
||||
// {"SCORES", TK_SCORES},
|
||||
// {"GRANTS", TK_GRANTS},
|
||||
// {"DOT", TK_DOT},
|
||||
// {"SYNCDB", TK_SYNCDB},
|
||||
// {"LOCAL", TK_LOCAL},
|
||||
// {"PPS", TK_PPS},
|
||||
// {"TSERIES", TK_TSERIES},
|
||||
// {"DBS", TK_DBS},
|
||||
// {"STORAGE", TK_STORAGE},
|
||||
// {"QTIME", TK_QTIME},
|
||||
// {"CONNS", TK_CONNS},
|
||||
// {"STATE", TK_STATE},
|
||||
// {"CTIME", TK_CTIME},
|
||||
// {"LP", TK_LP},
|
||||
// {"RP", TK_RP},
|
||||
|
@ -205,15 +221,8 @@ static SKeyword keywordTable[] = {
|
|||
// {"EVERY", TK_EVERY},
|
||||
// {"VARIABLE", TK_VARIABLE},
|
||||
// {"UPDATE", TK_UPDATE},
|
||||
// {"ADD", TK_ADD},
|
||||
// {"COLUMN", TK_COLUMN},
|
||||
// {"TAG", TK_TAG},
|
||||
// {"CHANGE", TK_CHANGE},
|
||||
// {"SET", TK_SET},
|
||||
// {"KILL", TK_KILL},
|
||||
// {"CONNECTION", TK_CONNECTION},
|
||||
// {"COLON", TK_COLON},
|
||||
// {"STREAM", TK_STREAM},
|
||||
// {"ABORT", TK_ABORT},
|
||||
// {"AFTER", TK_AFTER},
|
||||
// {"ATTACH", TK_ATTACH},
|
||||
|
@ -244,14 +253,7 @@ static SKeyword keywordTable[] = {
|
|||
// {"TRIGGER", TK_TRIGGER},
|
||||
// {"VIEW", TK_VIEW},
|
||||
// {"SEMI", TK_SEMI},
|
||||
// {"VNODES", TK_VNODES},
|
||||
// {"PARTITIONS", TK_PARTITIONS},
|
||||
// {"TOPICS", TK_TOPICS},
|
||||
// {"COMPACT", TK_COMPACT},
|
||||
// {"MODIFY", TK_MODIFY},
|
||||
// {"OUTPUTTYPE", TK_OUTPUTTYPE},
|
||||
// {"AGGREGATE", TK_AGGREGATE},
|
||||
// {"BUFSIZE", TK_BUFSIZE},
|
||||
// {"PARTITIONS", TK_PARTITIONS},
|
||||
// {"MODE", TK_MODE},
|
||||
};
|
||||
|
||||
|
|
|
@ -240,7 +240,11 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol
|
|||
if (NULL != pTable) {
|
||||
strcpy(pCol->tableAlias, pTable->tableAlias);
|
||||
} else if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
||||
strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
|
||||
SColumnNode* pProjCol = (SColumnNode*)pExpr;
|
||||
strcpy(pCol->tableAlias, pProjCol->tableAlias);
|
||||
pCol->tableId = pProjCol->tableId;
|
||||
pCol->colId = pProjCol->colId;
|
||||
pCol->colType = pProjCol->colType;
|
||||
}
|
||||
strcpy(pCol->colName, pExpr->aliasName);
|
||||
pCol->node.resType = pExpr->resType;
|
||||
|
@ -1095,10 +1099,6 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
|
|||
}
|
||||
|
||||
static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) {
|
||||
if (NULL == pList) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pArray = taosArrayInit(LIST_LENGTH(pList), sizeof(SField));
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pList) {
|
||||
|
@ -1161,10 +1161,15 @@ static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableSt
|
|||
createReq.delay = pStmt->pOptions->delay;
|
||||
columnDefNodeToField(pStmt->pCols, &createReq.pColumns);
|
||||
columnDefNodeToField(pStmt->pTags, &createReq.pTags);
|
||||
columnNodeToField(pStmt->pOptions->pSma, &createReq.pSmas);
|
||||
createReq.numOfColumns = LIST_LENGTH(pStmt->pCols);
|
||||
createReq.numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||
createReq.numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma);
|
||||
if (NULL == pStmt->pOptions->pSma) {
|
||||
columnDefNodeToField(pStmt->pCols, &createReq.pSmas);
|
||||
createReq.numOfSmas = createReq.numOfColumns;
|
||||
} else {
|
||||
columnNodeToField(pStmt->pOptions->pSma, &createReq.pSmas);
|
||||
createReq.numOfSmas = LIST_LENGTH(pStmt->pOptions->pSma);
|
||||
}
|
||||
|
||||
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(tableName.dbname, pStmt->dbName);
|
||||
|
@ -1470,20 +1475,20 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
|
|||
|
||||
static int32_t nodeTypeToShowType(ENodeType nt) {
|
||||
switch (nt) {
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
return TSDB_MGMT_TABLE_DB;
|
||||
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||
return TSDB_MGMT_TABLE_STB;
|
||||
case QUERY_NODE_SHOW_USERS_STMT:
|
||||
return TSDB_MGMT_TABLE_USER;
|
||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||
return TSDB_MGMT_TABLE_DNODE;
|
||||
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
||||
return TSDB_MGMT_TABLE_VGROUP;
|
||||
case QUERY_NODE_SHOW_MNODES_STMT:
|
||||
return TSDB_MGMT_TABLE_MNODE;
|
||||
case QUERY_NODE_SHOW_QNODES_STMT:
|
||||
return TSDB_MGMT_TABLE_QNODE;
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
return 0; // todo
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
return TSDB_MGMT_TABLE_CONNS;
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
return 0; // todo
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
return TSDB_MGMT_TABLE_QUERIES;
|
||||
case QUERY_NODE_SHOW_SCORES_STMT:
|
||||
return 0; // todo
|
||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
return 0; // todo
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
return TSDB_MGMT_TABLE_VARIABLES;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1509,30 +1514,6 @@ static int32_t translateShow(STranslateContext* pCxt, SShowStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateShowTables(STranslateContext* pCxt) {
|
||||
SVShowTablesReq* pShowReq = taosMemoryCalloc(1, sizeof(SVShowTablesReq));
|
||||
|
||||
SArray* array = NULL;
|
||||
int32_t code = getDBVgInfo(pCxt, pCxt->pParseCxt->db, &array);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
SVgroupInfo* info = taosArrayGet(array, 0);
|
||||
pShowReq->head.vgId = htonl(info->vgId);
|
||||
|
||||
pCxt->pCmdMsg = taosMemoryMalloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL == pCxt->pCmdMsg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCxt->pCmdMsg->epSet = info->epSet;
|
||||
pCxt->pCmdMsg->msgType = TDMT_VND_SHOW_TABLES;
|
||||
pCxt->pCmdMsg->msgLen = sizeof(SVShowTablesReq);
|
||||
pCxt->pCmdMsg->pMsg = pShowReq;
|
||||
pCxt->pCmdMsg->pExtension = array;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, char* pTableName, int32_t* pVgId) {
|
||||
SVgroupInfo vg = {0};
|
||||
int32_t code = getTableHashVgroup(pCxt, pCxt->pParseCxt->db, pTableName, &vg);
|
||||
|
@ -1868,17 +1849,19 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_ALTER_DNODE_STMT:
|
||||
code = translateAlterDnode(pCxt, (SAlterDnodeStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||
case QUERY_NODE_SHOW_USERS_STMT:
|
||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||
case QUERY_NODE_SHOW_VGROUPS_STMT:
|
||||
case QUERY_NODE_SHOW_MNODES_STMT:
|
||||
case QUERY_NODE_SHOW_QNODES_STMT:
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
case QUERY_NODE_SHOW_SCORES_STMT:
|
||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
code = translateShow(pCxt, (SShowStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||
code = translateShowTables(pCxt);
|
||||
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
|
||||
// todo
|
||||
break;
|
||||
case QUERY_NODE_CREATE_INDEX_STMT:
|
||||
code = translateCreateIndex(pCxt, (SCreateIndexStmt*)pNode);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -23,7 +23,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = (in[i] > 0)? in[i] : -in[i];
|
||||
out[i] = (in[i] >= 0)? in[i] : -in[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = (in[i] > 0)? in[i] : -in[i];
|
||||
out[i] = (in[i] >= 0)? in[i] : -in[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = (in[i] > 0)? in[i] : -in[i];
|
||||
out[i] = (in[i] >= 0)? in[i] : -in[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = (in[i] > 0)? in[i] : -in[i];
|
||||
out[i] = (in[i] >= 0)? in[i] : -in[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = (in[i] > 0)? in[i] : -in[i];
|
||||
out[i] = (in[i] >= 0)? in[i] : -in[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
|||
colDataSetNull_f(pOutputData->nullbitmap, i);
|
||||
continue;
|
||||
}
|
||||
out[i] = (in[i] > 0)? in[i] : -in[i];
|
||||
out[i] = (in[i] >= 0)? in[i] : -in[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -407,4 +407,4 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
|||
ASSERT(inputNum == 1);
|
||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,15 +29,15 @@ struct SBTree {
|
|||
int minLocal;
|
||||
int maxLeaf;
|
||||
int minLeaf;
|
||||
u8 *pTmp;
|
||||
void *pBuf;
|
||||
};
|
||||
|
||||
#define TDB_BTREE_PAGE_COMMON_HDR u8 flags;
|
||||
|
||||
#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0]
|
||||
#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0]
|
||||
#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags))
|
||||
#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
|
||||
#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
|
||||
#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
|
||||
#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
|
||||
#define TDB_BTREE_ASSERT_FLAG(flags) \
|
||||
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
|
||||
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0))
|
||||
|
@ -101,7 +101,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
|
|||
// pBt->kcmpr
|
||||
pBt->kcmpr = kcmpr ? kcmpr : tdbDefaultKeyCmprFn;
|
||||
// pBt->pageSize
|
||||
pBt->pageSize = tdbPagerGetPageSize(pPager);
|
||||
pBt->pageSize = pPager->pageSize;
|
||||
// pBt->maxLocal
|
||||
pBt->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)) / 4;
|
||||
// pBt->minLocal: Should not be allowed smaller than 15, which is [nPayload][nKey][nData]
|
||||
|
@ -127,132 +127,144 @@ int tdbBtreeClose(SBTree *pBt) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tdbBtCursorInsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal, int vLen) {
|
||||
int ret;
|
||||
int idx;
|
||||
SPager *pPager;
|
||||
SCell *pCell;
|
||||
int szCell;
|
||||
int cret;
|
||||
SBTree *pBt;
|
||||
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen) {
|
||||
SBTC btc;
|
||||
SCell *pCell;
|
||||
void *pBuf;
|
||||
int szCell;
|
||||
int szBuf;
|
||||
int ret;
|
||||
int idx;
|
||||
int c;
|
||||
|
||||
ret = tdbBtcMoveTo(pBtc, pKey, kLen, &cret);
|
||||
tdbBtcOpen(&btc, pBt);
|
||||
|
||||
// move to the position to insert
|
||||
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
|
||||
if (ret < 0) {
|
||||
// TODO: handle error
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pBtc->idx == -1) {
|
||||
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0);
|
||||
if (btc.idx == -1) {
|
||||
idx = 0;
|
||||
} else {
|
||||
if (cret > 0) {
|
||||
idx = pBtc->idx + 1;
|
||||
} else if (cret < 0) {
|
||||
idx = pBtc->idx;
|
||||
if (c > 0) {
|
||||
idx = btc.idx + 1;
|
||||
} else if (c < 0) {
|
||||
idx = btc.idx;
|
||||
} else {
|
||||
/* TODO */
|
||||
// TDB does NOT allow same key
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: refact code here
|
||||
pBt = pBtc->pBt;
|
||||
if (!pBt->pTmp) {
|
||||
pBt->pTmp = (u8 *)tdbOsMalloc(pBt->pageSize);
|
||||
if (pBt->pTmp == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pCell = pBt->pTmp;
|
||||
// make sure enough space to hold the cell
|
||||
szBuf = kLen + vLen + 14;
|
||||
pBuf = TDB_REALLOC(pBt->pBuf, pBt->pageSize > szBuf ? szBuf : pBt->pageSize);
|
||||
if (pBuf == NULL) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
pBt->pBuf = pBuf;
|
||||
pCell = (SCell *)pBt->pBuf;
|
||||
|
||||
// Encode the cell
|
||||
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pVal, vLen, pCell, &szCell);
|
||||
// encode cell
|
||||
ret = tdbBtreeEncodeCell(btc.pPage, pKey, kLen, pVal, vLen, pCell, &szCell);
|
||||
if (ret < 0) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Insert the cell to the index
|
||||
ret = tdbPageInsertCell(pBtc->pPage, idx, pCell, szCell, 0);
|
||||
// mark the page dirty
|
||||
ret = tdbPagerWrite(pBt->pPager, btc.pPage);
|
||||
if (ret < 0) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// If page is overflow, balance the tree
|
||||
if (pBtc->pPage->nOverflow > 0) {
|
||||
ret = tdbBtreeBalance(pBtc);
|
||||
// insert the cell
|
||||
ret = tdbPageInsertCell(btc.pPage, idx, pCell, szCell, 0);
|
||||
if (ret < 0) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// check if need balance
|
||||
if (btc.pPage->nOverflow > 0) {
|
||||
ret = tdbBtreeBalance(&btc);
|
||||
if (ret < 0) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tdbBtcClose(&btc);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen) {
|
||||
SBTC btc;
|
||||
SCell *pCell;
|
||||
int cret;
|
||||
void *pVal;
|
||||
SCellDecoder cd;
|
||||
|
||||
tdbBtcOpen(&btc, pBt);
|
||||
|
||||
tdbBtcMoveTo(&btc, pKey, kLen, &cret);
|
||||
|
||||
if (cret) {
|
||||
return cret;
|
||||
}
|
||||
|
||||
pCell = tdbPageGetCell(btc.pPage, btc.idx);
|
||||
tdbBtreeDecodeCell(btc.pPage, pCell, &cd);
|
||||
|
||||
*vLen = cd.vLen;
|
||||
pVal = TDB_REALLOC(*ppVal, *vLen);
|
||||
if (pVal == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
*ppVal = pVal;
|
||||
memcpy(*ppVal, cd.pVal, cd.vLen);
|
||||
return 0;
|
||||
return tdbBtreePGet(pBt, pKey, kLen, NULL, NULL, ppVal, vLen);
|
||||
}
|
||||
|
||||
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
|
||||
SBTC btc;
|
||||
SCell *pCell;
|
||||
int cret;
|
||||
void *pTKey;
|
||||
void *pTVal;
|
||||
int ret;
|
||||
void *pTKey = NULL;
|
||||
void *pTVal = NULL;
|
||||
SCellDecoder cd;
|
||||
|
||||
tdbBtcOpen(&btc, pBt);
|
||||
|
||||
tdbBtcMoveTo(&btc, pKey, kLen, &cret);
|
||||
ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
|
||||
if (ret < 0) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (cret) {
|
||||
return cret;
|
||||
tdbBtcClose(&btc);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pCell = tdbPageGetCell(btc.pPage, btc.idx);
|
||||
tdbBtreeDecodeCell(btc.pPage, pCell, &cd);
|
||||
|
||||
pTKey = TDB_REALLOC(*ppKey, cd.kLen);
|
||||
pTVal = TDB_REALLOC(*ppVal, cd.vLen);
|
||||
|
||||
if (pTKey == NULL || pTVal == NULL) {
|
||||
TDB_FREE(pTKey);
|
||||
TDB_FREE(pTVal);
|
||||
if (ppKey) {
|
||||
pTKey = TDB_REALLOC(*ppKey, cd.kLen);
|
||||
if (pTKey == NULL) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
*ppKey = pTKey;
|
||||
*pkLen = cd.kLen;
|
||||
memcpy(*ppKey, cd.pKey, cd.kLen);
|
||||
}
|
||||
|
||||
*ppKey = pTKey;
|
||||
pTVal = TDB_REALLOC(*ppVal, cd.vLen);
|
||||
if (pTVal == NULL) {
|
||||
tdbBtcClose(&btc);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
*ppVal = pTVal;
|
||||
*pkLen = cd.kLen;
|
||||
*vLen = cd.vLen;
|
||||
|
||||
memcpy(*ppKey, cd.pKey, cd.kLen);
|
||||
memcpy(*ppVal, cd.pVal, cd.vLen);
|
||||
|
||||
tdbBtcClose(&btc);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -300,7 +312,8 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// TODO: Unref the page
|
||||
// TODO: here still has problem
|
||||
tdbPagerReturnPage(pBt->pPager, pPage);
|
||||
|
||||
ASSERT(pgno != 0);
|
||||
pBt->root = pgno;
|
||||
|
@ -371,17 +384,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#ifndef TDB_BTREE_BALANCE
|
||||
typedef struct {
|
||||
SBTree *pBt;
|
||||
SPage *pParent;
|
||||
int idx;
|
||||
i8 nOld;
|
||||
SPage *pOldPages[3];
|
||||
i8 nNewPages;
|
||||
SPage *pNewPages[5];
|
||||
} SBtreeBalanceHelper;
|
||||
|
||||
// TDB_BTREE_BALANCE =====================
|
||||
static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
|
||||
SPager *pPager;
|
||||
SPage *pChild;
|
||||
|
@ -408,6 +411,13 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
|
|||
((SIntHdr *)pChild->pData)->pgno = ((SIntHdr *)(pRoot->pData))->pgno;
|
||||
}
|
||||
|
||||
ret = tdbPagerWrite(pPager, pChild);
|
||||
if (ret < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Copy the root page content to the child page
|
||||
tdbPageCopy(pRoot, pChild);
|
||||
|
||||
|
@ -472,6 +482,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
|
|||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbPagerWrite(pBt->pPager, pOlds[i]);
|
||||
if (ret < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// copy the parent key out if child pages are not leaf page
|
||||
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
|
||||
|
@ -492,6 +509,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
|
|||
}
|
||||
rPgno = ((SIntHdr *)pOlds[nOlds - 1]->pData)->pgno;
|
||||
}
|
||||
|
||||
ret = tdbPagerWrite(pBt->pPager, pParent);
|
||||
if (ret < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// drop the cells on parent page
|
||||
for (int i = 0; i < nOlds; i++) {
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
|
||||
|
@ -619,6 +644,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
|
|||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
ret = tdbPagerWrite(pBt->pPager, pNews[iNew]);
|
||||
if (ret < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -732,14 +764,24 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: here is not corrent for drop case
|
||||
for (int i = 0; i < nNews; i++) {
|
||||
if (i < nOlds) {
|
||||
tdbPagerReturnPage(pBt->pPager, pOlds[i]);
|
||||
} else {
|
||||
tdbPagerReturnPage(pBt->pPager, pNews[i]);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tdbBtreeBalance(SBTC *pBtc) {
|
||||
int iPage;
|
||||
int ret;
|
||||
int nFree;
|
||||
SPage *pParent;
|
||||
SPage *pPage;
|
||||
int ret;
|
||||
u8 flags;
|
||||
u8 leaf;
|
||||
u8 root;
|
||||
|
@ -750,10 +792,11 @@ static int tdbBtreeBalance(SBTC *pBtc) {
|
|||
pPage = pBtc->pPage;
|
||||
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
|
||||
root = TDB_BTREE_PAGE_IS_ROOT(pPage);
|
||||
nFree = TDB_PAGE_FREE_SIZE(pPage);
|
||||
|
||||
// when the page is not overflow and not too empty, the balance work
|
||||
// is finished. Just break out the balance loop.
|
||||
if (pPage->nOverflow == 0 /* TODO: && pPage->nFree <= */) {
|
||||
if (pPage->nOverflow == 0 && nFree < TDB_PAGE_USABLE_SIZE(pPage) * 2 / 3) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -781,6 +824,8 @@ static int tdbBtreeBalance(SBTC *pBtc) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage);
|
||||
|
||||
pBtc->iPage--;
|
||||
pBtc->pPage = pBtc->pgStack[pBtc->iPage];
|
||||
}
|
||||
|
@ -788,7 +833,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
// TDB_BTREE_BALANCE
|
||||
|
||||
// TDB_BTREE_CELL =====================
|
||||
static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal,
|
||||
|
@ -1028,12 +1073,11 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
|
|||
|
||||
// move upward
|
||||
for (;;) {
|
||||
if (pBtc->iPage == 0) {
|
||||
if (pBtc->iPage == iPage) {
|
||||
pBtc->idx = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBtc->iPage < iPage) break;
|
||||
tdbBtcMoveUpward(pBtc);
|
||||
}
|
||||
}
|
||||
|
@ -1056,6 +1100,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
|
|||
|
||||
int tdbBtcMoveToLast(SBTC *pBtc) {
|
||||
int ret;
|
||||
int nCells;
|
||||
SBTree *pBt;
|
||||
SPager *pPager;
|
||||
SPgno pgno;
|
||||
|
@ -1071,27 +1116,56 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
|
||||
pBtc->iPage = 0;
|
||||
if (nCells > 0) {
|
||||
pBtc->idx = TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage) ? nCells - 1 : nCells;
|
||||
} else {
|
||||
// no data at all, point to an invalid position
|
||||
ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
|
||||
pBtc->idx = -1;
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
// move from a position
|
||||
ASSERT(0);
|
||||
int iPage = 0;
|
||||
|
||||
// downward search
|
||||
for (; iPage < pBtc->iPage; iPage++) {
|
||||
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pgStack[iPage]));
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pgStack[iPage]);
|
||||
if (pBtc->idxStack[iPage] != nCells) break;
|
||||
}
|
||||
|
||||
// move upward
|
||||
for (;;) {
|
||||
if (pBtc->iPage == iPage) {
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
|
||||
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
|
||||
} else {
|
||||
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
tdbBtcMoveUpward(pBtc);
|
||||
}
|
||||
}
|
||||
|
||||
// move downward
|
||||
for (;;) {
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
|
||||
// TODO: handle empty case
|
||||
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0);
|
||||
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
|
||||
break;
|
||||
} else {
|
||||
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
|
||||
|
||||
ret = tdbBtcMoveDownward(pBtc);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
ret = tdbBtcMoveDownward(pBtc);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
|
||||
pBtc->idx = nCells - 1;
|
||||
} else {
|
||||
pBtc->idx = nCells;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1104,6 +1178,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
|
|||
void *pKey, *pVal;
|
||||
int ret;
|
||||
|
||||
// current cursor points to an invalid position
|
||||
if (pBtc->idx < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1134,12 +1209,17 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
|
|||
memcpy(pVal, cd.pVal, cd.vLen);
|
||||
|
||||
ret = tdbBtcMoveToNext(pBtc);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tdbBtcMoveToNext(SBTC *pBtc) {
|
||||
int nCells;
|
||||
int ret;
|
||||
SCell *pCell;
|
||||
|
||||
ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
|
||||
|
@ -1151,39 +1231,35 @@ static int tdbBtcMoveToNext(SBTC *pBtc) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pBtc->iPage == 0) {
|
||||
pBtc->idx = -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Move upward
|
||||
// move upward
|
||||
for (;;) {
|
||||
tdbBtcMoveUpward(pBtc);
|
||||
pBtc->idx++;
|
||||
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
|
||||
if (pBtc->idx <= nCells) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBtc->iPage == 0) {
|
||||
pBtc->idx = -1;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Move downward
|
||||
for (;;) {
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
|
||||
tdbBtcMoveUpward(pBtc);
|
||||
pBtc->idx++;
|
||||
|
||||
tdbBtcMoveDownward(pBtc);
|
||||
pBtc->idx = 0;
|
||||
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
|
||||
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
|
||||
if (pBtc->idx <= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// move downward
|
||||
for (;;) {
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
|
||||
|
||||
ret = tdbBtcMoveDownward(pBtc);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBtc->idx = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1230,91 +1306,145 @@ static int tdbBtcMoveUpward(SBTC *pBtc) {
|
|||
}
|
||||
|
||||
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
|
||||
int ret;
|
||||
SBTree *pBt;
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
int nCells;
|
||||
int c;
|
||||
SBTree *pBt;
|
||||
SCell *pCell;
|
||||
SPager *pPager;
|
||||
SCellDecoder cd = {0};
|
||||
|
||||
pBt = pBtc->pBt;
|
||||
pPager = pBt->pPager;
|
||||
|
||||
if (pBtc->iPage < 0) {
|
||||
ASSERT(pBtc->iPage == -1);
|
||||
ASSERT(pBtc->idx == -1);
|
||||
|
||||
// Move from the root
|
||||
// move from a clear cursor
|
||||
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
|
||||
if (ret < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBtc->iPage = 0;
|
||||
|
||||
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) {
|
||||
// Current page is empty
|
||||
// ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
int lidx, ridx, midx, c, nCells;
|
||||
SCell *pCell;
|
||||
SPage *pPage;
|
||||
SCellDecoder cd = {0};
|
||||
pBtc->iPage = 0;
|
||||
pBtc->idx = -1;
|
||||
// for empty tree, just return with an invalid position
|
||||
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) return 0;
|
||||
} else {
|
||||
SPage *pPage;
|
||||
int idx;
|
||||
int iPage = 0;
|
||||
|
||||
pPage = pBtc->pPage;
|
||||
// downward search
|
||||
for (; iPage < pBtc->iPage; iPage++) {
|
||||
pPage = pBtc->pgStack[iPage];
|
||||
idx = pBtc->idxStack[iPage];
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
|
||||
lidx = 0;
|
||||
ridx = nCells - 1;
|
||||
|
||||
ASSERT(nCells > 0);
|
||||
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage));
|
||||
|
||||
for (;;) {
|
||||
if (lidx > ridx) break;
|
||||
|
||||
midx = (lidx + ridx) >> 1;
|
||||
|
||||
pCell = tdbPageGetCell(pPage, midx);
|
||||
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
|
||||
if (ret < 0) {
|
||||
// TODO: handle error
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Compare the key values
|
||||
// check if key <= current position
|
||||
if (idx < nCells) {
|
||||
pCell = tdbPageGetCell(pPage, idx);
|
||||
tdbBtreeDecodeCell(pPage, pCell, &cd);
|
||||
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
|
||||
if (c < 0) {
|
||||
/* input-key < cell-key */
|
||||
ridx = midx - 1;
|
||||
} else if (c > 0) {
|
||||
/* input-key > cell-key */
|
||||
lidx = midx + 1;
|
||||
} else {
|
||||
/* input-key == cell-key */
|
||||
break;
|
||||
}
|
||||
if (c > 0) break;
|
||||
}
|
||||
|
||||
// Move downward or break
|
||||
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
|
||||
if (leaf) {
|
||||
pBtc->idx = midx;
|
||||
*pCRst = c;
|
||||
break;
|
||||
} else {
|
||||
if (c <= 0) {
|
||||
pBtc->idx = midx;
|
||||
} else {
|
||||
pBtc->idx = midx + 1;
|
||||
}
|
||||
tdbBtcMoveDownward(pBtc);
|
||||
// check if key > current - 1 position
|
||||
if (idx > 0) {
|
||||
pCell = tdbPageGetCell(pPage, idx - 1);
|
||||
tdbBtreeDecodeCell(pPage, pCell, &cd);
|
||||
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
|
||||
if (c <= 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// TODO: Move the cursor from a some position instead of a clear state
|
||||
ASSERT(0);
|
||||
// move upward
|
||||
for (;;) {
|
||||
if (pBtc->iPage == iPage) break;
|
||||
tdbBtcMoveUpward(pBtc);
|
||||
}
|
||||
}
|
||||
|
||||
// search downward to the leaf
|
||||
for (;;) {
|
||||
int lidx, ridx, midx;
|
||||
SPage *pPage;
|
||||
|
||||
pPage = pBtc->pPage;
|
||||
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
|
||||
lidx = 0;
|
||||
ridx = nCells - 1;
|
||||
|
||||
ASSERT(nCells > 0);
|
||||
ASSERT(pBtc->idx == -1);
|
||||
|
||||
// compare first cell
|
||||
midx = lidx;
|
||||
pCell = tdbPageGetCell(pPage, midx);
|
||||
tdbBtreeDecodeCell(pPage, pCell, &cd);
|
||||
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
|
||||
if (c <= 0) {
|
||||
ridx = lidx - 1;
|
||||
} else {
|
||||
lidx = lidx + 1;
|
||||
}
|
||||
|
||||
// compare last cell
|
||||
if (lidx <= ridx) {
|
||||
midx = ridx;
|
||||
pCell = tdbPageGetCell(pPage, midx);
|
||||
tdbBtreeDecodeCell(pPage, pCell, &cd);
|
||||
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
|
||||
if (c >= 0) {
|
||||
lidx = ridx + 1;
|
||||
} else {
|
||||
ridx = ridx - 1;
|
||||
}
|
||||
}
|
||||
|
||||
// binary search
|
||||
for (;;) {
|
||||
if (lidx > ridx) break;
|
||||
|
||||
midx = (lidx + ridx) >> 1;
|
||||
|
||||
pCell = tdbPageGetCell(pPage, midx);
|
||||
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
|
||||
if (ret < 0) {
|
||||
// TODO: handle error
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Compare the key values
|
||||
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
|
||||
if (c < 0) {
|
||||
// pKey < cd.pKey
|
||||
ridx = midx - 1;
|
||||
} else if (c > 0) {
|
||||
// pKey > cd.pKey
|
||||
lidx = midx + 1;
|
||||
} else {
|
||||
// pKey == cd.pKey
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// keep search downward or break
|
||||
if (TDB_BTREE_PAGE_IS_LEAF(pPage)) {
|
||||
pBtc->idx = midx;
|
||||
*pCRst = c;
|
||||
break;
|
||||
} else {
|
||||
if (c <= 0) {
|
||||
pBtc->idx = midx;
|
||||
} else {
|
||||
pBtc->idx = midx + 1;
|
||||
}
|
||||
tdbBtcMoveDownward(pBtc);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -49,6 +49,8 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprF
|
|||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdbEnvAddPager(pEnv, pPager);
|
||||
}
|
||||
|
||||
ASSERT(pPager != NULL);
|
||||
|
@ -74,22 +76,7 @@ int tdbDbDrop(TDB *pDb) {
|
|||
}
|
||||
|
||||
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) {
|
||||
SBTC btc;
|
||||
SBTC *pCur;
|
||||
int ret;
|
||||
|
||||
pCur = &btc;
|
||||
ret = tdbBtcOpen(pCur, pDb->pBt);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbBtCursorInsert(pCur, pKey, keyLen, pVal, valLen);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen);
|
||||
}
|
||||
|
||||
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
|
||||
|
|
|
@ -19,6 +19,7 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv) {
|
|||
TENV *pEnv;
|
||||
int dsize;
|
||||
int zsize;
|
||||
int tsize;
|
||||
u8 *pPtr;
|
||||
int ret;
|
||||
|
||||
|
@ -53,6 +54,14 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pEnv->nPgrHash = 8;
|
||||
tsize = sizeof(SPager *) * pEnv->nPgrHash;
|
||||
pEnv->pgrHash = TDB_REALLOC(pEnv->pgrHash, tsize);
|
||||
if (pEnv->pgrHash == NULL) {
|
||||
return -1;
|
||||
}
|
||||
memset(pEnv->pgrHash, 0, tsize);
|
||||
|
||||
mkdir(rootDir, 0755);
|
||||
|
||||
*ppEnv = pEnv;
|
||||
|
@ -64,7 +73,99 @@ int tdbEnvClose(TENV *pEnv) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tdbBegin(TENV *pEnv) {
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
|
||||
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerBegin(pPager);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbCommit(TENV *pEnv) {
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
|
||||
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerCommit(pPager);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbRollback(TENV *pEnv) {
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname) {
|
||||
// TODO
|
||||
return NULL;
|
||||
u32 hash;
|
||||
SPager **ppPager;
|
||||
|
||||
hash = tdbCstringHash(fname);
|
||||
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
|
||||
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
|
||||
}
|
||||
|
||||
return *ppPager;
|
||||
}
|
||||
|
||||
void tdbEnvAddPager(TENV *pEnv, SPager *pPager) {
|
||||
u32 hash;
|
||||
SPager **ppPager;
|
||||
|
||||
// rehash if neccessary
|
||||
if (pEnv->nPager + 1 > pEnv->nPgrHash) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
// add to list
|
||||
pPager->pNext = pEnv->pgrList;
|
||||
pEnv->pgrList = pPager;
|
||||
|
||||
// add to hash
|
||||
hash = tdbCstringHash(pPager->dbFileName);
|
||||
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
|
||||
pPager->pHashNext = *ppPager;
|
||||
*ppPager = pPager;
|
||||
|
||||
// increase the counter
|
||||
pEnv->nPager++;
|
||||
}
|
||||
|
||||
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager) {
|
||||
u32 hash;
|
||||
SPager **ppPager;
|
||||
|
||||
// remove from the list
|
||||
for (ppPager = &pEnv->pgrList; *ppPager && (*ppPager != pPager); ppPager = &((*ppPager)->pNext)) {
|
||||
}
|
||||
ASSERT(*ppPager == pPager);
|
||||
*ppPager = pPager->pNext;
|
||||
|
||||
// remove from hash
|
||||
hash = tdbCstringHash(pPager->dbFileName);
|
||||
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
|
||||
for (; *ppPager && *ppPager != pPager; ppPager = &((*ppPager)->pHashNext)) {
|
||||
}
|
||||
ASSERT(*ppPager == pPager);
|
||||
*ppPager = pPager->pNext;
|
||||
|
||||
// decrease the counter
|
||||
pEnv->nPager--;
|
||||
|
||||
// rehash if necessary
|
||||
if (pEnv->nPgrHash > 8 && pEnv->nPager < pEnv->nPgrHash / 2) {
|
||||
// TODO
|
||||
}
|
||||
}
|
|
@ -34,18 +34,6 @@ struct SPCache {
|
|||
})
|
||||
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
|
||||
|
||||
// For page ref
|
||||
#define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0)
|
||||
#if 0
|
||||
#define TDB_REF_PAGE(pPage) (++(pPage)->nRef)
|
||||
#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef)
|
||||
#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef)
|
||||
#else
|
||||
#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1)
|
||||
#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1)
|
||||
#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef))
|
||||
#endif
|
||||
|
||||
static int tdbPCacheOpenImpl(SPCache *pCache);
|
||||
static void tdbPCacheInitLock(SPCache *pCache);
|
||||
static void tdbPCacheClearLock(SPCache *pCache);
|
||||
|
@ -107,12 +95,7 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage) {
|
|||
ASSERT(nRef >= 0);
|
||||
|
||||
if (nRef == 0) {
|
||||
if (1 /*TODO: page still clean*/) {
|
||||
tdbPCacheUnpinPage(pCache, pPage);
|
||||
} else {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
}
|
||||
tdbPCacheUnpinPage(pCache, pPage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,6 +175,8 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
|
|||
|
||||
tdbPCacheLock(pCache);
|
||||
|
||||
ASSERT(!pPage->isDirty);
|
||||
|
||||
nRef = TDB_GET_PAGE_REF(pPage);
|
||||
ASSERT(nRef >= 0);
|
||||
if (nRef == 0) {
|
||||
|
|
|
@ -15,20 +15,6 @@
|
|||
|
||||
#include "tdbInt.h"
|
||||
|
||||
struct SPager {
|
||||
char *dbFileName;
|
||||
char *jFileName;
|
||||
int pageSize;
|
||||
uint8_t fid[TDB_FILE_ID_LEN];
|
||||
tdb_fd_t fd;
|
||||
tdb_fd_t jfd;
|
||||
SPCache *pCache;
|
||||
SPgno dbFileSize;
|
||||
SPgno dbOrigSize;
|
||||
SPage *pDirty;
|
||||
u8 inTran;
|
||||
};
|
||||
|
||||
typedef struct __attribute__((__packed__)) {
|
||||
u8 hdrString[16];
|
||||
u16 pageSize;
|
||||
|
@ -41,9 +27,8 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct")
|
|||
|
||||
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
|
||||
|
||||
static int tdbPagerReadPage(SPager *pPager, SPage *pPage);
|
||||
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
|
||||
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg);
|
||||
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage);
|
||||
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
|
||||
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
|
||||
|
||||
|
@ -131,26 +116,36 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
|
|||
}
|
||||
|
||||
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
|
||||
int ret;
|
||||
int ret;
|
||||
SPage **ppPage;
|
||||
|
||||
ASSERT(pPager->inTran);
|
||||
#if 0
|
||||
if (pPager->inTran == 0) {
|
||||
ret = tdbPagerBegin(pPager);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pPage->isDirty) return 0;
|
||||
|
||||
// ref page one more time so the page will not be release
|
||||
TDB_REF_PAGE(pPage);
|
||||
|
||||
// Set page as dirty
|
||||
pPage->isDirty = 1;
|
||||
|
||||
// Add page to dirty list
|
||||
// TODO: sort the list according to the page number
|
||||
pPage->pDirtyNext = pPager->pDirty;
|
||||
pPager->pDirty = pPage;
|
||||
// Add page to dirty list(TODO: NOT use O(n^2) algorithm)
|
||||
for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
|
||||
ppPage = &((*ppPage)->pDirtyNext)) {
|
||||
}
|
||||
ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
|
||||
pPage->pDirtyNext = *ppPage;
|
||||
*ppPage = pPage;
|
||||
|
||||
// Write page to journal
|
||||
// Write page to journal if neccessary
|
||||
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
|
||||
ret = tdbPagerWritePageToJournal(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
|
@ -184,54 +179,46 @@ int tdbPagerCommit(SPager *pPager) {
|
|||
SPage *pPage;
|
||||
int ret;
|
||||
|
||||
// Begin commit
|
||||
{
|
||||
// TODO: Sync the journal file (Here or when write ?)
|
||||
// sync the journal file
|
||||
ret = tdbOsFSync(pPager->jfd);
|
||||
if (ret < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
pPage = pPager->pDirty;
|
||||
|
||||
if (pPage == NULL) break;
|
||||
|
||||
// loop to write the dirty pages to file
|
||||
for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
|
||||
// TODO: update the page footer
|
||||
ret = tdbPagerWritePageToDB(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// release the page
|
||||
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
|
||||
pPager->pDirty = pPage->pDirtyNext;
|
||||
pPage->pDirtyNext = NULL;
|
||||
|
||||
// TODO: release the page
|
||||
pPage->isDirty = 0;
|
||||
|
||||
tdbPCacheRelease(pPager->pCache, pPage);
|
||||
}
|
||||
|
||||
// sync the db file
|
||||
tdbOsFSync(pPager->fd);
|
||||
|
||||
// remote the journal file
|
||||
tdbOsClose(pPager->jfd);
|
||||
tdbOsRemove(pPager->jFileName);
|
||||
// pPager->jfd = -1;
|
||||
pPager->dbOrigSize = pPager->dbFileSize;
|
||||
pPager->inTran = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tdbPagerReadPage(SPager *pPager, SPage *pPage) {
|
||||
i64 offset;
|
||||
int ret;
|
||||
|
||||
ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0);
|
||||
|
||||
offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize);
|
||||
ret = tdbOsPRead(pPager->fd, pPage->pData, pPager->pageSize, offset);
|
||||
if (ret < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerGetPageSize(SPager *pPager) { return pPager->pageSize; }
|
||||
|
||||
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) {
|
||||
SPage *pPage;
|
||||
SPgid pgid;
|
||||
|
@ -247,7 +234,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
|
|||
|
||||
// Initialize the page if need
|
||||
if (!TDB_PAGE_INITIALIZED(pPage)) {
|
||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg);
|
||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 1);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -284,7 +271,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
|
|||
ASSERT(!TDB_PAGE_INITIALIZED(pPage));
|
||||
|
||||
// Initialize the page if need
|
||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg);
|
||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -332,10 +319,11 @@ static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg) {
|
||||
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage) {
|
||||
int ret;
|
||||
int lcode;
|
||||
int nLoops;
|
||||
i64 nRead;
|
||||
|
||||
lcode = TDB_TRY_LOCK_PAGE(pPage);
|
||||
if (lcode == P_LOCK_SUCC) {
|
||||
|
@ -344,6 +332,19 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (loadPage) {
|
||||
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * TDB_PAGE_PGNO(pPage));
|
||||
if (nRead < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
} else if (nRead < pPage->pageSize) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ret = (*initPage)(pPage, arg);
|
||||
if (ret < 0) {
|
||||
TDB_UNLOCK_PAGE(pPage);
|
||||
|
|
|
@ -15,17 +15,29 @@
|
|||
|
||||
#include "tdbInt.h"
|
||||
|
||||
int tdbTxnBegin(TENV *pEnv) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
// int tdbTxnBegin(TENV *pEnv) {
|
||||
// // TODO
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
int tdbTxnCommit(TENV *pEnv) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
// int tdbTxnCommit(TENV *pEnv) {
|
||||
// SPager *pPager = NULL;
|
||||
// int ret;
|
||||
|
||||
int tdbTxnRollback(TENV *pEnv) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
// for (;;) {
|
||||
// break;
|
||||
// ret = tdbPagerCommit(pPager);
|
||||
// if (ret < 0) {
|
||||
// ASSERT(0);
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
|
||||
// // TODO
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
// int tdbTxnRollback(TENV *pEnv) {
|
||||
// // TODO
|
||||
// return 0;
|
||||
// }
|
|
@ -40,7 +40,7 @@ struct SBTC {
|
|||
// SBTree
|
||||
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt);
|
||||
int tdbBtreeClose(SBTree *pBt);
|
||||
int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, int vLen);
|
||||
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen);
|
||||
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
|
||||
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
|
||||
|
||||
|
|
|
@ -25,11 +25,20 @@ typedef struct STEnv {
|
|||
char *jfname;
|
||||
int jfd;
|
||||
SPCache *pCache;
|
||||
SPager *pgrList;
|
||||
int nPager;
|
||||
int nPgrHash;
|
||||
SPager **pgrHash;
|
||||
} TENV;
|
||||
|
||||
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv);
|
||||
int tdbEnvClose(TENV *pEnv);
|
||||
int tdbBegin(TENV *pEnv);
|
||||
int tdbCommit(TENV *pEnv);
|
||||
int tdbRollback(TENV *pEnv);
|
||||
|
||||
void tdbEnvAddPager(TENV *pEnv, SPager *pPager);
|
||||
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager);
|
||||
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
#ifndef _TD_TDB_INTERNAL_H_
|
||||
#define _TD_TDB_INTERNAL_H_
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "tdb.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -91,23 +89,6 @@ static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
|
|||
// dbname
|
||||
#define TDB_MAX_DBNAME_LEN 24
|
||||
|
||||
// tdb_log
|
||||
#define tdbError(var)
|
||||
|
||||
#define TERR_A(val, op, flag) \
|
||||
do { \
|
||||
if (((val) = (op)) != 0) { \
|
||||
goto flag; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define TERR_B(val, op, flag) \
|
||||
do { \
|
||||
if (((val) = (op)) == NULL) { \
|
||||
goto flag; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define TDB_VARIANT_LEN ((int)-1)
|
||||
|
||||
typedef int (*FKeyComparator)(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
|
|
@ -24,6 +24,8 @@ extern "C" {
|
|||
#define TDB_FOR_TDENGINE
|
||||
|
||||
#ifdef TDB_FOR_TDENGINE
|
||||
#include "os.h"
|
||||
#include "thash.h"
|
||||
|
||||
// For memory -----------------
|
||||
#define tdbOsMalloc taosMemoryMalloc
|
||||
|
@ -95,7 +97,11 @@ typedef int tdb_fd_t;
|
|||
|
||||
#define tdbOsOpen(PATH, OPTION, MODE) open((PATH), (OPTION), (MODE))
|
||||
|
||||
#define tdbOsClose close
|
||||
#define tdbOsClose(FD) \
|
||||
do { \
|
||||
close(FD); \
|
||||
(FD) = -1; \
|
||||
} while (0)
|
||||
|
||||
i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes);
|
||||
i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset);
|
||||
|
|
|
@ -33,6 +33,18 @@ extern "C" {
|
|||
SPager *pPager; \
|
||||
SPgid pgid;
|
||||
|
||||
// For page ref
|
||||
#define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0)
|
||||
#if 0
|
||||
#define TDB_REF_PAGE(pPage) (++(pPage)->nRef)
|
||||
#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef)
|
||||
#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef)
|
||||
#else
|
||||
#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1)
|
||||
#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1)
|
||||
#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef))
|
||||
#endif
|
||||
|
||||
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
|
||||
int tdbPCacheClose(SPCache *pCache);
|
||||
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);
|
||||
|
|
|
@ -100,6 +100,7 @@ struct SPage {
|
|||
// APIs
|
||||
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
|
||||
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
|
||||
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
|
||||
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
|
||||
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset)
|
||||
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
|
||||
|
|
|
@ -20,13 +20,28 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct SPager {
|
||||
char *dbFileName;
|
||||
char *jFileName;
|
||||
int pageSize;
|
||||
uint8_t fid[TDB_FILE_ID_LEN];
|
||||
tdb_fd_t fd;
|
||||
tdb_fd_t jfd;
|
||||
SPCache *pCache;
|
||||
SPgno dbFileSize;
|
||||
SPgno dbOrigSize;
|
||||
SPage *pDirty;
|
||||
u8 inTran;
|
||||
SPager *pNext; // used by TENV
|
||||
SPager *pHashNext; // used by TENV
|
||||
};
|
||||
|
||||
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
|
||||
int tdbPagerClose(SPager *pPager);
|
||||
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
|
||||
int tdbPagerWrite(SPager *pPager, SPage *pPage);
|
||||
int tdbPagerBegin(SPager *pPager);
|
||||
int tdbPagerCommit(SPager *pPager);
|
||||
int tdbPagerGetPageSize(SPager *pPager);
|
||||
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
|
||||
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
|
||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage);
|
||||
|
|
|
@ -28,10 +28,6 @@ struct STxn {
|
|||
void *xArg;
|
||||
};
|
||||
|
||||
int tdbTxnBegin(TENV *pEnv);
|
||||
int tdbTxnCommit(TENV *pEnv);
|
||||
int tdbTxnRollback(TENV *pEnv);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -101,6 +101,8 @@ static inline int tdbGetVarInt(const u8 *p, int *v) {
|
|||
return n;
|
||||
}
|
||||
|
||||
static inline u32 tdbCstringHash(const char *s) { return MurmurHash3_32(s, strlen(s)); }
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -118,10 +118,10 @@ TEST(tdb_test, simple_test) {
|
|||
TENV *pEnv;
|
||||
TDB *pDb;
|
||||
FKeyComparator compFunc;
|
||||
int nData = 1000000;
|
||||
int nData = 50000000;
|
||||
|
||||
// Open Env
|
||||
ret = tdbEnvOpen("tdb", 4096, 256000, &pEnv);
|
||||
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// Create a database
|
||||
|
@ -134,36 +134,23 @@ TEST(tdb_test, simple_test) {
|
|||
char val[64];
|
||||
|
||||
{ // Insert some data
|
||||
int i = 1;
|
||||
SPoolMem *pPool;
|
||||
int memPoolCapacity = 16 * 1024;
|
||||
for (int i = 1; i <= nData;) {
|
||||
tdbBegin(pEnv);
|
||||
|
||||
pPool = openPool();
|
||||
|
||||
tdbTxnBegin(pEnv);
|
||||
|
||||
for (;;) {
|
||||
if (i > nData) break;
|
||||
|
||||
sprintf(key, "key%d", i);
|
||||
sprintf(val, "value%d", i);
|
||||
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
if (pPool->size >= memPoolCapacity) {
|
||||
tdbTxnCommit(pEnv);
|
||||
|
||||
clearPool(pPool);
|
||||
|
||||
tdbTxnBegin(pEnv);
|
||||
for (int k = 0; k < 2000; k++) {
|
||||
sprintf(key, "key%d", i);
|
||||
sprintf(val, "value%d", i);
|
||||
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
i++;
|
||||
}
|
||||
|
||||
i++;
|
||||
tdbCommit(pEnv);
|
||||
}
|
||||
|
||||
closePool(pPool);
|
||||
}
|
||||
|
||||
tdbCommit(pEnv);
|
||||
|
||||
{ // Query the data
|
||||
void *pVal = NULL;
|
||||
int vLen;
|
||||
|
@ -173,6 +160,7 @@ TEST(tdb_test, simple_test) {
|
|||
sprintf(val, "value%d", i);
|
||||
|
||||
ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen);
|
||||
ASSERT(ret == 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
GTEST_ASSERT_EQ(vLen, strlen(val));
|
||||
|
|
|
@ -294,7 +294,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
|
|||
#if FILE_WITH_LOCK
|
||||
taosThreadRwlockWrlock(&((*ppFile)->rwlock));
|
||||
#endif
|
||||
if (ppFile == NULL || *ppFile == NULL || (*ppFile)->fd == -1) {
|
||||
if (ppFile == NULL || *ppFile == NULL) {
|
||||
return 0;
|
||||
}
|
||||
if ((*ppFile)->fp != NULL) {
|
||||
|
|
|
@ -23,15 +23,22 @@
|
|||
|
||||
#define TD_MEMORY_STACK_TRACE_DEPTH 10
|
||||
|
||||
typedef struct TdMemoryInfo *TdMemoryInfoPtr;
|
||||
|
||||
typedef struct TdMemoryInfo {
|
||||
int32_t symbol;
|
||||
int32_t memorySize;
|
||||
void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX
|
||||
} *TdMemoryInfoPtr , TdMemoryInfo;
|
||||
// TdMemoryInfoPtr pNext;
|
||||
// TdMemoryInfoPtr pPrev;
|
||||
} TdMemoryInfo;
|
||||
|
||||
// static TdMemoryInfoPtr GlobalMemoryPtr = NULL;
|
||||
|
||||
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
|
||||
|
||||
#define tstrdup(str) _strdup(str)
|
||||
#else
|
||||
#define tstrdup(str) strdup(str)
|
||||
|
||||
#include<execinfo.h>
|
||||
|
||||
|
@ -129,6 +136,26 @@ void *taosMemoryRealloc(void *ptr, int32_t size) {
|
|||
#endif
|
||||
}
|
||||
|
||||
void *taosMemoryStrDup(void *ptr) {
|
||||
#ifdef USE_TD_MEMORY
|
||||
if (ptr == NULL) return NULL;
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo));
|
||||
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
|
||||
|
||||
void *tmp = tstrdup((const char *)pTdMemoryInfo);
|
||||
if (tmp == NULL) return NULL;
|
||||
|
||||
memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo));
|
||||
taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
|
||||
return (char*)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
return tstrdup((const char *)ptr);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void taosMemoryFree(const void *ptr) {
|
||||
if (ptr == NULL) return;
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
#!/bin/bash
|
||||
|
||||
##################################################
|
||||
#
|
||||
# Do simulation test
|
||||
#
|
||||
##################################################
|
||||
|
||||
set -e
|
||||
#set -x
|
||||
|
||||
while read line
|
||||
do
|
||||
firstChar=`echo ${line:0:1}`
|
||||
if [[ -n "$line" ]] && [[ $firstChar != "#" ]]; then
|
||||
echo "======== $line ========"
|
||||
$line
|
||||
fi
|
||||
done < ./jenkins/basic.txt
|
||||
|
||||
|
|
@ -138,7 +138,7 @@ sql_error alter database db ntables 0
|
|||
sql_error alter database db ntables 1
|
||||
sql_error alter database db ntables 10
|
||||
|
||||
#print ============== modify replica
|
||||
#print ============== modify replica # TD-14409
|
||||
sql_error alter database db replica 2
|
||||
sql_error alter database db replica 5
|
||||
sql_error alter database db replica -1
|
||||
|
@ -270,7 +270,7 @@ if $data12_db != 2 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
#sql_error alter database db wal 0 # TD-14436
|
||||
sql_error alter database db wal 0 # TD-14436
|
||||
sql_error alter database db wal 3
|
||||
sql_error alter database db wal 100
|
||||
sql_error alter database db wal -1
|
||||
|
|
|
@ -142,68 +142,68 @@ if $row != 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @2022-01-01 00:00:00.000@ then
|
||||
if $data00 != @22-01-01 00:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data90 != @2022-01-01 00:00:00.009@ then
|
||||
if $data90 != @22-01-01 00:00:00.009@ then
|
||||
return -1
|
||||
endi
|
||||
if $data91 != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1;
|
||||
if $row != 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data02
|
||||
if $data02 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data04
|
||||
if $data04 != 0.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print $data12
|
||||
if $data12 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 1.00000 then
|
||||
print expect 1.00000, actual:$data14
|
||||
return -1
|
||||
endi
|
||||
print ============> filter not supported yet.
|
||||
#sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1;
|
||||
#if $row != 20 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data00 != 0 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data01 != 0 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#print $data02
|
||||
#if $data02 != 0.000000000 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data03 != 0 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#print $data04
|
||||
#if $data04 != 0.00000 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data10 != 100 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data11 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#print $data12
|
||||
#if $data12 != 1.000000000 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data13 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#if $data14 != 1.00000 then
|
||||
# print expect 1.00000, actual:$data14
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
sql_error select sum(c1), ts, c1 from group_tb0 where c1<20 group by c1;
|
||||
sql_error select first(ts), ts, c2 from group_tb0 where c1 < 20 group by c1;
|
||||
|
|
Loading…
Reference in New Issue