diff --git a/include/libs/index/index.h b/include/libs/index/index.h
index d2b157542f..47eb97cc3a 100644
--- a/include/libs/index/index.h
+++ b/include/libs/index/index.h
@@ -76,25 +76,20 @@ void indexOptsDestroy(SIndexOpts* opts);
* @param:
*/
-SIndexTerm* indexTermCreate(int64_t suid,
- SIndexOperOnColumn operType,
- uint8_t colType,
- const char* colName,
- int32_t nColName,
- const char* colVal,
- int32_t nColVal);
+SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName,
+ int32_t nColName, const char* colVal, int32_t nColVal);
void indexTermDestroy(SIndexTerm* p);
/*
- * init index
- *
- */
-int32_t indexInit();
-/*
- * destory index
+ * init index env
*
*/
+void indexInit();
+/*
+ * destory index env
+ *
+ */
void indexCleanUp();
#ifdef __cplusplus
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index 0d5792fd91..da88366f11 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -91,9 +91,10 @@ enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
- META_TYPE_BOTH_TABLE,
+ META_TYPE_BOTH_TABLE
};
+
typedef struct STableMetaOutput {
int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN];
diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h
index 1114b66a80..74b7813465 100644
--- a/include/libs/scheduler/scheduler.h
+++ b/include/libs/scheduler/scheduler.h
@@ -24,7 +24,7 @@ extern "C" {
#include "catalog.h"
typedef struct SSchedulerCfg {
- int32_t maxJobNum;
+ uint32_t maxJobNum;
} SSchedulerCfg;
typedef struct SQueryProfileSummary {
diff --git a/include/util/tqueue.h b/include/util/tqueue.h
index a57bdb5ce8..63ba460d39 100644
--- a/include/util/tqueue.h
+++ b/include/util/tqueue.h
@@ -51,6 +51,7 @@ void taosFreeQitem(void *pItem);
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue);
+int32_t taosQueueSize(STaosQueue *queue);
STaosQall *taosAllocateQall();
void taosFreeQall(STaosQall *qall);
diff --git a/source/dnode/vnode/meta/CMakeLists.txt b/source/dnode/vnode/meta/CMakeLists.txt
index bb48d1acad..7041811617 100644
--- a/source/dnode/vnode/meta/CMakeLists.txt
+++ b/source/dnode/vnode/meta/CMakeLists.txt
@@ -19,11 +19,13 @@ add_library(meta STATIC ${META_SRC})
target_include_directories(
meta
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/meta"
+ PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
meta
PUBLIC common
+ PUBLIC index
)
if(${META_DB_IMPL} STREQUAL "BDB")
diff --git a/source/dnode/vnode/meta/src/metaIdx.c b/source/dnode/vnode/meta/src/metaIdx.c
index fe07f5ced4..828bd12088 100644
--- a/source/dnode/vnode/meta/src/metaIdx.c
+++ b/source/dnode/vnode/meta/src/metaIdx.c
@@ -13,9 +13,13 @@
* along with this program. If not, see .
*/
+#include "index.h"
#include "metaDef.h"
struct SMetaIdx {
+#ifdef USE_INVERTED_INDEX
+ SIndex *pIdx;
+#endif
/* data */
};
@@ -43,6 +47,13 @@ int metaOpenIdx(SMeta *pMeta) {
rocksdb_options_destroy(options);
#endif
+#ifdef USE_INVERTED_INDEX
+ SIndexOpts opts;
+ if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
+ return -1;
+ }
+
+#endif
return 0;
}
@@ -53,14 +64,47 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
pMeta->pIdx = NULL;
}
#endif
+
+#ifdef USE_INVERTED_INDEX
+ SIndexOpts opts;
+ if (indexClose(pMeta->pIdx->pIdx) != 0) {
+ return -1;
+ }
+
+#endif
}
-int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions) {
+int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
+#ifdef USE_INVERTED_INDEX
+ if (pTbCfgs - type == META_CHILD_TABLE) {
+ char buf[8] = {0};
+ int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
+ sprintf(buf, "%d", colId); // colname
+
+ char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
+
+ tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id
+ tb_uid_t tuid = 0; // child table uid
+ SIndexMultiTerm *terms = indexMultiTermCreate();
+ SIndexTerm * term =
+ indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid);
+ indexMultiTermAdd(terms, term);
+
+ int ret = indexPut(pMeta->pIdx->pIdx, terms);
+ indexMultiTermDestroy(terms);
+ return ret;
+ } else {
+ return DB_DONOTINDEX;
+ }
+#endif
// TODO
return 0;
}
int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
+#ifdef USE_INVERTED_INDEX
+
+#endif
// TODO
return 0;
-}
\ No newline at end of file
+}
diff --git a/source/dnode/vnode/meta/src/metaTbUid.c b/source/dnode/vnode/meta/src/metaTbUid.c
index be85b45d95..cad1eba134 100644
--- a/source/dnode/vnode/meta/src/metaTbUid.c
+++ b/source/dnode/vnode/meta/src/metaTbUid.c
@@ -22,9 +22,10 @@ int metaOpenUidGnrt(SMeta *pMeta) {
return 0;
}
-void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */ }
+void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
+}
tb_uid_t metaGenerateUid(SMeta *pMeta) {
// Generate a new table UID
return ++(pMeta->uidGnrt.nextUid);
-}
\ No newline at end of file
+}
diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c
index da0951fe1f..94f34b8e17 100644
--- a/source/libs/catalog/src/catalog.c
+++ b/source/libs/catalog/src/catalog.c
@@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
+ // REMOEV THIS ....
+ if (0 == tbMeta->vgId) {
+ SVgroupInfo vgroup = {0};
+
+ catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup);
+
+ tbMeta->vgId = vgroup.vgId;
+ }
+ // REMOVE THIS ....
+
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
} else {
diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h
index 3f0b150c8e..1bbf5494dd 100644
--- a/source/libs/executor/inc/dataSinkInt.h
+++ b/source/libs/executor/inc/dataSinkInt.h
@@ -26,17 +26,26 @@ extern "C" {
struct SDataSink;
struct SDataSinkHandle;
-typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes);
-typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen);
+typedef struct SDataSinkManager {
+ SDataSinkMgtCfg cfg;
+ pthread_mutex_t mutex;
+} SDataSinkManager;
+
+typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
+typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
+typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
+typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle {
FPutDataBlock fPut;
- FGetDataBlock fGet;
+ FEndPut fEndPut;
+ FGetDataLength fGetLen;
+ FGetDataBlock fGetData;
FDestroyDataSinker fDestroy;
} SDataSinkHandle;
-int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
+int32_t createDataDispatcher(SDataSinkManager* pManager, const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
#ifdef __cplusplus
}
diff --git a/source/libs/executor/inc/dataSinkMgt.h b/source/libs/executor/inc/dataSinkMgt.h
index fab5958107..d13423b25d 100644
--- a/source/libs/executor/inc/dataSinkMgt.h
+++ b/source/libs/executor/inc/dataSinkMgt.h
@@ -26,6 +26,8 @@ extern "C" {
#define DS_CAPACITY_ENOUGH 1
#define DS_CAPACITY_FULL 2
#define DS_NEED_SCHEDULE 3
+#define DS_END 4
+#define DS_IN_PROCESS 5
struct SDataSink;
struct SSDataBlock;
@@ -39,11 +41,16 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle;
-typedef struct SDataResult {
- SQueryCostInfo profile;
+typedef struct SInputData {
const SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
-} SDataResult;
+} SInputData;
+
+typedef struct SOutPutData {
+ int32_t numOfRows;
+ int8_t compressed;
+ char* pData;
+} SOutPutData;
/**
* Create a subplan's datasinker handle for all later operations.
@@ -59,30 +66,25 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
* @param pRes
* @return error code
*/
-int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes);
+int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
+
+void dsEndPut(DataSinkHandle handle);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle
* @return data length
*/
-int32_t dsGetDataLength(DataSinkHandle handle);
+int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
/**
* Get data, the caller needs to allocate data memory.
* @param handle
- * @param pData output
- * @param pLen output
+ * @param pOutput output
+ * @param pStatus output
* @return error code
*/
-int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen);
-
-/**
- * Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
- * @param handle
- * @return datasinker status
- */
-int32_t dsGetStatus(DataSinkHandle handle);
+int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c
index b2c135e96d..3d8e51d04d 100644
--- a/source/libs/executor/src/dataDispatcher.c
+++ b/source/libs/executor/src/dataDispatcher.c
@@ -20,21 +20,29 @@
#include "tglobal.h"
#include "tqueue.h"
-#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos
-#define GET_BUF_REMAIN(buf) (buf)->remain
+#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
-typedef struct SBuf {
- int32_t size;
- int32_t pos;
- int32_t remain;
+typedef struct SDataDispatchBuf {
+ int32_t useSize;
+ int32_t allocSize;
char* pData;
-} SBuf;
+} SDataDispatchBuf;
+
+typedef struct SDataCacheEntry {
+ int32_t dataLen;
+ int32_t numOfRows;
+ int8_t compressed;
+ char data[];
+} SDataCacheEntry;
typedef struct SDataDispatchHandle {
SDataSinkHandle sink;
+ SDataSinkManager* pManager;
SDataBlockSchema schema;
STaosQueue* pDataBlocks;
- SBuf buf;
+ SDataDispatchBuf nextOutput;
+ int32_t status;
+ pthread_mutex_t mutex;
} SDataDispatchHandle;
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
@@ -53,87 +61,156 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSche
return false;
}
-static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
+static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows;
return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
-static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
+static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
int32_t *compSizes = (int32_t*)data;
if (compressed) {
data += pSchema->numOfCols * sizeof(int32_t);
}
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
- SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col);
+ SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
if (compressed) {
- compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed);
+ compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
- memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows);
- data += pColRes->info.bytes * pRes->pData->info.rows;
+ memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows);
+ data += pColRes->info.bytes * pInput->pData->info.rows;
}
}
- int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap);
+ int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
- STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL);
+ STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
while (item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
- item = taosHashIterate(pRes->pTableRetrieveTsMap, item);
+ item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
}
}
-static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) {
- SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData;
- pRsp->useconds = htobe64(pRes->profile.elapsedTime);
- pRsp->precision = htons(pHandle->schema.precision);
- pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema));
- pRsp->numOfRows = htonl(pRes->pData->info.rows);
+// data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
+// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
+static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
+ SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
+ pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
+ pEntry->numOfRows = pInput->pData->info.rows;
- *pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp);
- doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen);
- *pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows);
-
- pRsp->compLen = htonl(pRsp->compLen);
+ pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
+ copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
+ pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows);
// todo completed
}
-static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) {
- SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
- int32_t useSize = 0;
- toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize);
+static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
+ if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) {
+ return false;
+ }
+ pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
+ pBuf->pData = malloc(pBuf->allocSize);
+ return NULL != pBuf->pData;
}
-static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) {
+static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
+ pthread_mutex_lock(&pDispatcher->mutex);
+ int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
+ pDispatcher->status = status;
+ pthread_mutex_unlock(&pDispatcher->mutex);
+ return status;
+}
+static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
+ pthread_mutex_lock(&pDispatcher->mutex);
+ int32_t status = pDispatcher->status;
+ pthread_mutex_unlock(&pDispatcher->mutex);
+ return status;
+}
+
+static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
+ SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
+ SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
+ if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
+ }
+ toDataCacheEntry(pDispatcher, pInput, pBuf);
+ taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
+ *pStatus = updateStatus(pDispatcher);
+ return TSDB_CODE_SUCCESS;
+}
+
+static void endPut(struct SDataSinkHandle* pHandle) {
+ SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
+ pthread_mutex_lock(&pDispatcher->mutex);
+ pDispatcher->status = DS_END;
+ pthread_mutex_unlock(&pDispatcher->mutex);
+}
+
+static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
+ SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
+ if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
+ *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
+ return 0;
+ }
+ SDataDispatchBuf* pBuf = NULL;
+ taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
+ memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
+ taosFreeQitem(pBuf);
+ return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
+}
+
+static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
+ SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
+ SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
+ memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
+ pOutput->numOfRows = pEntry->numOfRows;
+ pOutput->compressed = pEntry->compressed;
+ tfree(pDispatcher->nextOutput.pData); // todo persistent
+ *pStatus = updateStatus(pDispatcher);
+ return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
-
+ SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
+ tfree(pDispatcher->nextOutput.pData);
+ while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
+ SDataDispatchBuf* pBuf = NULL;
+ taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
+ tfree(pBuf->pData);
+ taosFreeQitem(pBuf);
+ }
+ taosCloseQueue(pDispatcher->pDataBlocks);
+ pthread_mutex_destroy(&pDispatcher->mutex);
}
-int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) {
+int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
- return TSDB_CODE_FAILED;
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
dispatcher->sink.fPut = putDataBlock;
- dispatcher->sink.fGet = getDataBlock;
+ dispatcher->sink.fGetLen = getDataLength;
+ dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
+ dispatcher->pManager = pManager;
+ dispatcher->schema = pDataSink->schema;
+ dispatcher->status = DS_CAPACITY_ENOUGH;
dispatcher->pDataBlocks = taosOpenQueue();
+ pthread_mutex_init(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
- return TSDB_CODE_FAILED;
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c
index 2193babc76..8a96c5d05f 100644
--- a/source/libs/executor/src/dataSinkMgt.c
+++ b/source/libs/executor/src/dataSinkMgt.c
@@ -17,33 +17,38 @@
#include "dataSinkInt.h"
#include "planner.h"
+static SDataSinkManager gDataSinkManager = {0};
+
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
- // todo
+ gDataSinkManager.cfg = *cfg;
+ pthread_mutex_init(&gDataSinkManager.mutex, NULL);
}
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
if (DSINK_Dispatch == pDataSink->info.type) {
- return createDataDispatcher(pDataSink, pHandle);
+ return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
}
return TSDB_CODE_FAILED;
}
-int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) {
+int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
- return pHandleImpl->fPut(pHandleImpl, pRes);
+ return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
}
-int32_t dsGetDataLength(DataSinkHandle handle) {
- // todo
-}
-
-int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) {
+void dsEndPut(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
- return pHandleImpl->fGet(pHandleImpl, pData, pLen);
+ return pHandleImpl->fEndPut(pHandleImpl);
}
-int32_t dsGetStatus(DataSinkHandle handle) {
- // todo
+int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
+ SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
+ return pHandleImpl->fGetLen(pHandleImpl, pStatus);
+}
+
+int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
+ SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
+ return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
}
void dsScheduleProcess(void* ahandle, void* pItem) {
diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c
index d0b8fa4290..04dac57d8f 100644
--- a/source/libs/index/src/index.c
+++ b/source/libs/index/src/index.c
@@ -30,21 +30,17 @@
void* indexQhandle = NULL;
-int32_t indexInit() {
+void indexInit() {
+ // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
- return indexQhandle == NULL ? -1 : 0;
- // do nothing
}
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
static int uidCompare(const void* a, const void* b) {
+ // add more version compare
uint64_t u1 = *(uint64_t*)a;
uint64_t u2 = *(uint64_t*)b;
- if (u1 == u2) {
- return 0;
- } else {
- return u1 < u2 ? -1 : 1;
- }
+ return u1 - u2;
}
typedef struct SIdxColInfo {
int colId; // generated by index internal
@@ -61,7 +57,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
- // pthread_once(&isInit, indexInit);
+ pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { return -1; }
diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c
index 90a730d3a9..495c4d4477 100644
--- a/source/libs/index/src/index_tfile.c
+++ b/source/libs/index/src/index_tfile.c
@@ -71,7 +71,10 @@ TFileCache* tfileCacheCreate(const char* path) {
}
TFileReader* reader = tfileReaderCreate(wc);
- if (reader == NULL) { goto End; }
+ if (reader == NULL) {
+ indexInfo("skip invalid file: %s", file);
+ continue;
+ }
TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c
index 3ca3d87a79..5cabbb5e3b 100644
--- a/source/libs/parser/src/astValidate.c
+++ b/source/libs/parser/src/astValidate.c
@@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
return TSDB_CODE_SUCCESS;
}
+int32_t setTableVgroupList(SParseBasicCtx *pCtx, SName* name, SVgroupsInfo **pVgList) {
+ SArray* vgroupList = NULL;
+ int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
+ }
+
+ int32_t vgroupNum = taosArrayGetSize(vgroupList);
+
+ SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum);
+
+ vgList->numOfVgroups = vgroupNum;
+
+ for (int32_t i = 0; i < vgroupNum; ++i) {
+ SVgroupInfo *vg = taosArrayGet(vgroupList, i);
+ vgList->vgroups[i].vgId = vg->vgId;
+ vgList->vgroups[i].numOfEps = vg->numOfEps;
+ memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr));
+ }
+
+ *pVgList = vgList;
+
+ taosArrayDestroy(vgroupList);
+
+ return TSDB_CODE_SUCCESS;
+}
+
int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) {
assert(pCtx != NULL && pInfo != NULL);
int32_t code = 0;
@@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
if (code != TSDB_CODE_SUCCESS) {
return code;
}
-
+
data.pTableMeta = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(data.pTableMeta, &pmt);
@@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
pQueryInfo->pTableMetaInfo[0]->name = *name;
pQueryInfo->numOfTables = 1;
+ code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
+ if (code != TSDB_CODE_SUCCESS) {
+ taosArrayDestroy(data.pTableMeta);
+ return code;
+ }
+
// evaluate the sqlnode
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
assert(pTableMeta != NULL);
diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c
index d22f92517d..f440e6cdfe 100644
--- a/source/libs/parser/src/parser.c
+++ b/source/libs/parser/src/parser.c
@@ -237,6 +237,9 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) {
}
void qDestroyQuery(SQueryNode* pQueryNode) {
+ if (NULL == pQueryNode) {
+ return;
+ }
if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
taosArrayDestroy(pModifInfo->pDataBlocks);
diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c
index 461f16cdf0..bbb84223ac 100644
--- a/source/libs/planner/src/physicalPlan.c
+++ b/source/libs/planner/src/physicalPlan.c
@@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
return (SPhyNode*)node;
}
-static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
- return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
-}
static bool isSystemTable(SQueryTableInfo* pTable) {
// todo
@@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
}
+static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
+ vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
+
+ return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
+}
+
+
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
+
if (needMultiNodeScan(pTable)) {
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
}
- return createSingleTableScanNode(pPlanNode, pTable);
+ return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
}
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
@@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_MODIFY == pRoot->info.type) {
splitModificationOpSubPlan(pCxt, pRoot);
} else {
- SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
+ SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
++(pCxt->nextId.templateId);
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot);
- subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
+ subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
}
// todo deal subquery
}
diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c
index faeed74f1a..20da1842cf 100644
--- a/source/libs/planner/src/physicalPlanJson.c
+++ b/source/libs/planner/src/physicalPlanJson.c
@@ -398,7 +398,7 @@ static bool exprNodeFromJson(const cJSON* json, void* obj) {
case TEXPR_FUNCTION_NODE:
return fromObject(json, jkExprNodeFunction, functionFromJson, exprInfo, false);
case TEXPR_COL_NODE:
- return fromObject(json, jkExprNodeColumn, schemaFromJson, exprInfo->pSchema, false);
+ return fromObjectWithAlloc(json, jkExprNodeColumn, schemaFromJson, (void**)&exprInfo->pSchema, sizeof(SSchema), false);
case TEXPR_VALUE_NODE:
return fromObject(json, jkExprNodeValue, variantFromJson, exprInfo->pVal, false);
default:
diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h
index a7ec39bfde..270f255ec0 100644
--- a/source/libs/scheduler/inc/schedulerInt.h
+++ b/source/libs/scheduler/inc/schedulerInt.h
@@ -37,10 +37,10 @@ enum {
};
typedef struct SSchedulerMgmt {
- uint64_t taskId;
- uint64_t sId;
+ uint64_t taskId; // sequential taksId
+ uint64_t sId; // schedulerId
SSchedulerCfg cfg;
- SHashObj *jobs; // key: queryId, value: SQueryJob*
+ SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
@@ -83,52 +83,61 @@ typedef struct SSchJobAttr {
typedef struct SSchJob {
uint64_t queryId;
- int32_t levelNum;
- int32_t levelIdx;
- int8_t status;
SSchJobAttr attr;
- SEpSet dataSrcEps;
- SEpAddr resEp;
+ int32_t levelNum;
void *transport;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr
- tsem_t rspSem;
- int32_t userFetch;
- int32_t remoteFetch;
- SSchTask *fetchTask;
-
- int32_t errCode;
- void *res;
- int32_t resNumOfRows;
+ SArray *levels; // Element is SQueryLevel, starting from 0. SArray
+ SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
+ int32_t levelIdx;
+ SEpSet dataSrcEps;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
- SArray *levels; // Element is SQueryLevel, starting from 0. SArray
- SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray
-
+ int8_t status;
+ SQueryNodeAddr resNode;
+ tsem_t rspSem;
+ int32_t userFetch;
+ int32_t remoteFetch;
+ SSchTask *fetchTask;
+ int32_t errCode;
+ void *res;
+ int32_t resNumOfRows;
SQueryProfileSummary summary;
} SSchJob;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
-#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
+#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
-#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
-#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
+#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
+#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
+
+#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
+#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
+
+#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
+#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
+
+#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
+#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
+
+#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
+#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
-#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
-extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
-extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
+static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
+static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
#ifdef __cplusplus
}
diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c
index a07620f102..3f8c75a78c 100644
--- a/source/libs/scheduler/src/scheduler.c
+++ b/source/libs/scheduler/src/scheduler.c
@@ -20,203 +20,263 @@
static SSchedulerMgmt schMgmt = {0};
-int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
- for (int32_t i = 0; i < job->levelNum; ++i) {
- SSchLevel *level = taosArrayGet(job->levels, i);
+int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
+ int32_t code = 0;
+
+ if (oriStatus == newStatus) {
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ }
+
+ switch (oriStatus) {
+ case JOB_TASK_STATUS_NULL:
+ if (newStatus != JOB_TASK_STATUS_EXECUTING
+ && newStatus != JOB_TASK_STATUS_FAILED
+ && newStatus != JOB_TASK_STATUS_NOT_START) {
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ }
+
+ break;
+ case JOB_TASK_STATUS_NOT_START:
+ if (newStatus != JOB_TASK_STATUS_CANCELLED) {
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ }
+
+ break;
+ case JOB_TASK_STATUS_EXECUTING:
+ if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
+ && newStatus != JOB_TASK_STATUS_FAILED
+ && newStatus != JOB_TASK_STATUS_CANCELLING
+ && newStatus != JOB_TASK_STATUS_CANCELLED
+ && newStatus != JOB_TASK_STATUS_DROPPING) {
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ }
+
+ break;
+ case JOB_TASK_STATUS_PARTIAL_SUCCEED:
+ if (newStatus != JOB_TASK_STATUS_EXECUTING
+ && newStatus != JOB_TASK_STATUS_SUCCEED
+ && newStatus != JOB_TASK_STATUS_CANCELLED) {
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ }
+
+ break;
+ case JOB_TASK_STATUS_SUCCEED:
+ case JOB_TASK_STATUS_FAILED:
+ case JOB_TASK_STATUS_CANCELLING:
+ if (newStatus != JOB_TASK_STATUS_CANCELLED) {
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ }
+
+ break;
+ case JOB_TASK_STATUS_CANCELLED:
+ case JOB_TASK_STATUS_DROPPING:
+ SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
+ break;
+
+ default:
+ qError("invalid task status:%d", oriStatus);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ return TSDB_CODE_SUCCESS;
+
+_return:
+
+ SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus);
+ SCH_ERR_RET(code);
+}
+
+
+int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
+ for (int32_t i = 0; i < pJob->levelNum; ++i) {
+ SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
- for (int32_t m = 0; m < level->taskNum; ++m) {
- SSchTask *task = taosArrayGet(level->subTasks, m);
- SSubplan *plan = task->plan;
- int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
- int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
+ for (int32_t m = 0; m < pLevel->taskNum; ++m) {
+ SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
+ SSubplan *pPlan = pTask->plan;
+ int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0;
+ int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
if (childNum > 0) {
- task->children = taosArrayInit(childNum, POINTER_BYTES);
- if (NULL == task->children) {
- qError("taosArrayInit %d failed", childNum);
+ if (pJob->levelIdx == pLevel->level) {
+ SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
+ SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
+ }
+
+ pTask->children = taosArrayInit(childNum, POINTER_BYTES);
+ if (NULL == pTask->children) {
+ SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
for (int32_t n = 0; n < childNum; ++n) {
- SSubplan **child = taosArrayGet(plan->pChildren, n);
+ SSubplan **child = taosArrayGet(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
- qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
+ SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
- if (NULL == taosArrayPush(task->children, childTask)) {
- qError("taosArrayPush failed");
+ if (NULL == taosArrayPush(pTask->children, childTask)) {
+ SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
if (parentNum > 0) {
- task->parents = taosArrayInit(parentNum, POINTER_BYTES);
- if (NULL == task->parents) {
- qError("taosArrayInit %d failed", parentNum);
+ if (0 == pLevel->level) {
+ SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
+ SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
+ }
+
+ pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
+ if (NULL == pTask->parents) {
+ SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
for (int32_t n = 0; n < parentNum; ++n) {
- SSubplan **parent = taosArrayGet(plan->pParents, n);
+ SSubplan **parent = taosArrayGet(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
- qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
+ SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
- if (NULL == taosArrayPush(task->parents, parentTask)) {
- qError("taosArrayPush failed");
+ if (NULL == taosArrayPush(pTask->parents, parentTask)) {
+ SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- }
+ }
+
+ SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
}
}
- SSchLevel *level = taosArrayGet(job->levels, 0);
- if (job->attr.queryJob && level->taskNum > 1) {
- qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
- SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
- }
-
- SSchTask *task = taosArrayGet(level->subTasks, 0);
- if (task->parents && taosArrayGetSize(task->parents) > 0) {
- qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
+ SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
+ if (pJob->attr.queryJob && pLevel->taskNum > 1) {
+ SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS;
}
-static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
- SSchTask task = {0};
- if (plan->type == QUERY_TYPE_MODIFY) {
- pJob->attr.needFetch = false;
- } else {
- pJob->attr.queryJob = true;
- }
+int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
+ pTask->plan = pPlan;
+ pTask->level = pLevel;
+ SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
+ pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
- task.plan = plan;
- task.level = pLevel;
- task.status = JOB_TASK_STATUS_NOT_START;
- task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
-
- return task;
+ return TSDB_CODE_SUCCESS;
}
-static void cleanupTask(SSchTask* pTask) {
+void schFreeTask(SSchTask* pTask) {
taosArrayDestroy(pTask->candidateAddrs);
}
-int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) {
+int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
int32_t code = 0;
- pJob->queryId = dag->queryId;
+ pJob->queryId = pDag->queryId;
- if (dag->numOfSubplans <= 0) {
- qError("invalid subplan num:%d", dag->numOfSubplans);
+ if (pDag->numOfSubplans <= 0) {
+ SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
- int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
+ int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans);
if (levelNum <= 0) {
- qError("invalid level num:%d", levelNum);
+ SCH_JOB_ELOG("invalid level num:%d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == planToTask) {
- qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
+ SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == pJob->levels) {
- qError("taosArrayInit %d failed", levelNum);
+ SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- //??
- pJob->attr.needFetch = true;
-
pJob->levelNum = levelNum;
pJob->levelIdx = levelNum - 1;
- pJob->subPlans = dag->pSubplans;
+ pJob->subPlans = pDag->pSubplans;
SSchLevel level = {0};
- SArray *levelPlans = NULL;
- int32_t levelPlanNum = 0;
+ SArray *plans = NULL;
+ int32_t taskNum = 0;
SSchLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) {
if (NULL == taosArrayPush(pJob->levels, &level)) {
- qError("taosArrayPush failed");
+ SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLevel = taosArrayGet(pJob->levels, i);
pLevel->level = i;
- levelPlans = taosArrayGetP(dag->pSubplans, i);
- if (NULL == levelPlans) {
- qError("no level plans for level %d", i);
- SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
- }
-
- levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
- if (levelPlanNum <= 0) {
- qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
- SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
- }
-
- pLevel->taskNum = levelPlanNum;
- pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
+ plans = taosArrayGetP(pDag->pSubplans, i);
+ if (NULL == plans) {
+ SCH_JOB_ELOG("empty level plan, level:%d", i);
+ SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
+ }
+
+ taskNum = (int32_t)taosArrayGetSize(plans);
+ if (taskNum <= 0) {
+ SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
+ SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
+ }
+
+ pLevel->taskNum = taskNum;
+
+ pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
if (NULL == pLevel->subTasks) {
- qError("taosArrayInit %d failed", levelPlanNum);
+ SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- for (int32_t n = 0; n < levelPlanNum; ++n) {
- SSubplan *plan = taosArrayGetP(levelPlans, n);
- if (plan->type == QUERY_TYPE_MODIFY) {
- pJob->attr.needFetch = false;
- } else {
- pJob->attr.queryJob = true;
- }
+ for (int32_t n = 0; n < taskNum; ++n) {
+ SSubplan *plan = taosArrayGetP(plans, n);
- SSchTask task = initTask(pJob, plan, pLevel);
+ SCH_SET_JOB_TYPE(&pJob->attr, plan->type);
+
+ SSchTask task = {0};
+ SSchTask *pTask = &task;
+
+ schInitTask(pJob, &task, plan, pLevel);
+
void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) {
- qError("taosArrayPush failed");
+ SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
- qError("taosHashPut failed");
+ SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
+
+ SCH_TASK_DLOG("task initialized, level:%d", pLevel->level);
}
+
+ SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
}
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
- if (planToTask) {
- taosHashCleanup(planToTask);
- }
-
- return TSDB_CODE_SUCCESS;
-
_return:
- if (pLevel->subTasks) {
- taosArrayDestroy(pLevel->subTasks);
- }
if (planToTask) {
taosHashCleanup(planToTask);
@@ -225,42 +285,49 @@ _return:
SCH_RET(code);
}
-int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
- if (task->candidateAddrs) {
+int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
+ if (NULL != pTask->candidateAddrs) {
return TSDB_CODE_SUCCESS;
}
- task->candidateIdx = 0;
- task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
- if (NULL == task->candidateAddrs) {
- qError("taosArrayInit failed");
+ pTask->candidateIdx = 0;
+ pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
+ if (NULL == pTask->candidateAddrs) {
+ SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- if (task->plan->execNode.numOfEps > 0) {
- if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
- qError("taosArrayPush failed");
+ if (pTask->plan->execNode.numOfEps > 0) {
+ if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
+ SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
+ SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.numOfEps);
+
return TSDB_CODE_SUCCESS;
}
int32_t addNum = 0;
-
- if (job->nodeList) {
- int32_t nodeNum = (int32_t) taosArrayGetSize(job->nodeList);
+ int32_t nodeNum = 0;
+ if (pJob->nodeList) {
+ nodeNum = taosArrayGetSize(pJob->nodeList);
+
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
- SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
-
- if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
- qError("taosArrayPush failed");
+ SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
+
+ if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
+ SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
-
- ++addNum;
}
}
+
+ if (addNum <= 0) {
+ SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum);
+ return TSDB_CODE_QRY_INVALID_INPUT;
+ }
+
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
@@ -274,12 +341,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
}
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
- if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
- qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId);
+ int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
+ if (0 != code) {
+ if (HASH_NODE_EXIST(code)) {
+ SCH_TASK_ELOG("task already in exec list, code:%x", code);
+ SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
+ }
+
+ SCH_TASK_ELOG("taosHashPut task to exec list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), pJob->queryId);
+ SCH_TASK_DLOG("task added to exec list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
+
return TSDB_CODE_SUCCESS;
}
@@ -347,7 +421,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
bool needFetch = job->userFetch;
- if ((!job->attr.needFetch) && job->attr.syncSchedule) {
+ if ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule) {
tsem_post(&job->rspSem);
}
@@ -358,14 +432,21 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
return TSDB_CODE_SUCCESS;
}
-int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
- job->status = JOB_TASK_STATUS_FAILED;
- job->errCode = errCode;
+int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
+ int8_t status = SCH_GET_JOB_STATUS(pJob);
- atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
+ if (schValidateStatus(pJob, status, JOB_TASK_STATUS_FAILED)) {
+ SCH_ERR_RET(atomic_load_32(&pJob->errCode));
+ }
+
+ SCH_SET_JOB_STATUS(pJob, JOB_TASK_STATUS_FAILED);
+
+ atomic_store_32(&pJob->errCode, errCode);
- if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
- tsem_post(&job->rspSem);
+ atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
+
+ if (pJob->userFetch || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
+ tsem_post(&pJob->rspSem);
}
return TSDB_CODE_SUCCESS;
@@ -378,50 +459,51 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
}
-int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
+int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
- SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
+ SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved));
if (!moved) {
- SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status);
+ SCH_TASK_ELOG(" task may already moved, status:%d", pTask->status);
return TSDB_CODE_SUCCESS;
}
- task->status = JOB_TASK_STATUS_SUCCEED;
+ SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
- int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0;
+ int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
- if (task->plan->level != 0) {
- qError("level error");
+ if (pTask->level->level != 0) {
+ SCH_TASK_ELOG("no parent task level error, level:%d", pTask->level->level);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t taskDone = 0;
- if (SCH_TASK_NEED_WAIT_ALL(task)) {
- SCH_LOCK(SCH_WRITE, &task->level->lock);
- task->level->taskSucceed++;
- taskDone = task->level->taskSucceed + task->level->taskFailed;
- SCH_UNLOCK(SCH_WRITE, &task->level->lock);
+ if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
+ SCH_LOCK(SCH_WRITE, &pTask->level->lock);
+ pTask->level->taskSucceed++;
+ taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
+ SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
- if (taskDone < task->level->taskNum) {
- qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
+ if (taskDone < pTask->level->taskNum) {
+ SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
+ } else if (taskDone > pTask->level->taskNum) {
+ assert(0);
}
- if (task->level->taskFailed > 0) {
- job->status = JOB_TASK_STATUS_FAILED;
- SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR));
+ if (pTask->level->taskFailed > 0) {
+ pJob->status = JOB_TASK_STATUS_FAILED;
+ SCH_ERR_RET(schProcessOnJobFailure(pJob, TSDB_CODE_QRY_APP_ERROR));
return TSDB_CODE_SUCCESS;
}
} else {
- strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn));
- job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port;
+ pJob->resNode = pTask->execAddr;
}
- job->fetchTask = task;
- SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
+ pJob->fetchTask = pTask;
+ SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
return TSDB_CODE_SUCCESS;
}
@@ -436,53 +518,56 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
*/
for (int32_t i = 0; i < parentNum; ++i) {
- SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
+ SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
- ++par->childReady;
+ atomic_add_fetch_32(&par->childReady, 1);
- SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
+ SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr));
if (SCH_TASK_READY_TO_LUNCH(par)) {
- SCH_ERR_RET(schLaunchTask(job, par));
+ SCH_ERR_RET(schLaunchTask(pJob, par));
}
}
return TSDB_CODE_SUCCESS;
}
-int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
+int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
- SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
+ SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) {
- SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
+ SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
- SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
- if (!moved) {
- SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
- }
+ if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
+ SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
+ if (!moved) {
+ SCH_TASK_ELOG("task may already moved, status:%d", pTask->status);
+ }
+ }
+
+ SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED);
- if (SCH_TASK_NEED_WAIT_ALL(task)) {
- SCH_LOCK(SCH_WRITE, &task->level->lock);
- task->level->taskFailed++;
- taskDone = task->level->taskSucceed + task->level->taskFailed;
- SCH_UNLOCK(SCH_WRITE, &task->level->lock);
+ if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
+ SCH_LOCK(SCH_WRITE, &pTask->level->lock);
+ pTask->level->taskFailed++;
+ taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
+ SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
- if (taskDone < task->level->taskNum) {
- qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
+ if (taskDone < pTask->level->taskNum) {
+ qDebug("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
}
}
-
- job->status = JOB_TASK_STATUS_FAILED;
- SCH_ERR_RET(schProcessOnJobFailure(job, errCode));
+
+ SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));
- return TSDB_CODE_SUCCESS;
+ return errCode;
}
- SCH_ERR_RET(schLaunchTask(job, task));
+ SCH_ERR_RET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
@@ -659,13 +744,13 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
- qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
+ qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
- qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
+ qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@@ -683,11 +768,13 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
+
SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo));
return TSDB_CODE_SUCCESS;
_return:
+
tfree(param);
tfree(pMsgSendInfo);
@@ -705,109 +792,101 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
}
-int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
+int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
+ SEpSet epSet;
+
+ SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
+
+ schConvertAddrToEpSet(addr, &epSet);
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: {
- if (NULL == task->msg || task->msgLen <= 0) {
- qError("submit msg is NULL");
- SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
- }
-
- msgSize = task->msgLen;
- msg = task->msg;
+ msgSize = pTask->msgLen;
+ msg = pTask->msg;
break;
}
case TDMT_VND_QUERY: {
- if (NULL == task->msg) {
- qError("query msg is NULL");
- SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
- }
-
- msgSize = sizeof(SSubQueryMsg) + task->msgLen;
+ msgSize = sizeof(SSubQueryMsg) + pTask->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
- qError("calloc %d failed", msgSize);
+ SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
- pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
+ pMsg->header.vgId = htonl(addr->nodeId);
+
pMsg->sId = htobe64(schMgmt.sId);
- pMsg->queryId = htobe64(job->queryId);
- pMsg->taskId = htobe64(task->taskId);
- pMsg->contentLen = htonl(task->msgLen);
- memcpy(pMsg->msg, task->msg, task->msgLen);
+ pMsg->queryId = htobe64(pJob->queryId);
+ pMsg->taskId = htobe64(pTask->taskId);
+ pMsg->contentLen = htonl(pTask->msgLen);
+ memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
- qError("calloc %d failed", msgSize);
+ SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
- pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
+ pMsg->header.vgId = htonl(addr->nodeId);
+
pMsg->sId = htobe64(schMgmt.sId);
- pMsg->queryId = htobe64(job->queryId);
- pMsg->taskId = htobe64(task->taskId);
+ pMsg->queryId = htobe64(pJob->queryId);
+ pMsg->taskId = htobe64(pTask->taskId);
break;
}
case TDMT_VND_FETCH: {
- if (NULL == task) {
- SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
- }
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
- qError("calloc %d failed", msgSize);
+ SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
- pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
+ pMsg->header.vgId = htonl(addr->nodeId);
+
pMsg->sId = htobe64(schMgmt.sId);
- pMsg->queryId = htobe64(job->queryId);
- pMsg->taskId = htobe64(task->taskId);
+ pMsg->queryId = htobe64(pJob->queryId);
+ pMsg->taskId = htobe64(pTask->taskId);
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
- qError("calloc %d failed", msgSize);
+ SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
- pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
+ pMsg->header.vgId = htonl(addr->nodeId);
+
pMsg->sId = htobe64(schMgmt.sId);
- pMsg->queryId = htobe64(job->queryId);
- pMsg->taskId = htobe64(task->taskId);
+ pMsg->queryId = htobe64(pJob->queryId);
+ pMsg->taskId = htobe64(pTask->taskId);
break;
}
default:
- qError("unknown msg type:%d", msgType);
+ SCH_TASK_ELOG("unknown msg type:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
- SEpSet epSet;
- SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx);
-
- schConvertAddrToEpSet(addr, &epSet);
- SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize));
+ SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
return TSDB_CODE_SUCCESS;
@@ -817,33 +896,64 @@ _return:
SCH_RET(code);
}
-
-int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
- SSubplan *plan = task->plan;
- SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
- SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
-
- if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) {
- SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId);
- SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
+static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
+ int8_t status = SCH_GET_JOB_STATUS(pJob);
+ if (pStatus) {
+ *pStatus = status;
}
- // NOTE: race condition: the task should be put into the hash table before send msg to server
- SCH_ERR_RET(schPushTaskToExecList(job, task));
- SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
-
- task->status = JOB_TASK_STATUS_EXECUTING;
- return TSDB_CODE_SUCCESS;
+ return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED
+ || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
}
-int32_t schLaunchJob(SSchJob *job) {
- SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
- for (int32_t i = 0; i < level->taskNum; ++i) {
- SSchTask *task = taosArrayGet(level->subTasks, i);
- SCH_ERR_RET(schLaunchTask(job, task));
+int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
+ int8_t status = 0;
+ int32_t code = 0;
+
+ if (schJobNeedToStop(pJob, &status)) {
+ SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status);
+ SCH_ERR_RET(atomic_load_32(&pJob->errCode));
+ }
+
+ SSubplan *plan = pTask->plan;
+
+ if (NULL == pTask->msg) {
+ code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
+ if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) {
+ SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
+ SCH_ERR_JRET(code);
+ }
+ }
+
+ SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
+
+ // NOTE: race condition: the task should be put into the hash table before send msg to server
+ if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
+ SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
+
+ SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
- job->status = JOB_TASK_STATUS_EXECUTING;
+ SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType));
+
+ return TSDB_CODE_SUCCESS;
+
+_return:
+
+ code = schProcessOnTaskFailure(pJob, pTask, code);
+
+ SCH_RET(code);
+}
+
+int32_t schLaunchJob(SSchJob *pJob) {
+ SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
+
+ for (int32_t i = 0; i < level->taskNum; ++i) {
+ SSchTask *pTask = taosArrayGet(level->subTasks, i);
+ SCH_ERR_RET(schLaunchTask(pJob, pTask));
+ }
+
+ pJob->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
@@ -876,27 +986,90 @@ void schDropJobAllTasks(SSchJob *job) {
}
}
-uint64_t schGenSchId(void) {
- uint64_t sId = 0;
+int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
+ if (nodeList && taosArrayGetSize(nodeList) <= 0) {
+ qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
+ }
- // TODO
+ int32_t code = 0;
+ SSchJob *pJob = calloc(1, sizeof(SSchJob));
+ if (NULL == pJob) {
+ qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
+ SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
- qDebug("Gen sId:0x%"PRIx64, sId);
+ pJob->attr.syncSchedule = syncSchedule;
+ pJob->transport = transport;
+ pJob->nodeList = nodeList;
- return sId;
+ SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
+
+ pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
+ if (NULL == pJob->execTasks) {
+ SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
+ SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+
+ pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
+ if (NULL == pJob->succTasks) {
+ SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
+ SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+
+ pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
+ if (NULL == pJob->failTasks) {
+ SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
+ SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+
+ tsem_init(&pJob->rspSem, 0, 0);
+
+ code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES);
+ if (0 != code) {
+ if (HASH_NODE_EXIST(code)) {
+ SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob);
+ SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
+ } else {
+ SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno);
+ SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+ }
+
+ pJob->status = JOB_TASK_STATUS_NOT_START;
+
+ SCH_ERR_JRET(schLaunchJob(pJob));
+
+ *(SSchJob **)job = pJob;
+
+ if (syncSchedule) {
+ SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob));
+ tsem_wait(&pJob->rspSem);
+ }
+
+ SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
+
+ return TSDB_CODE_SUCCESS;
+
+_return:
+
+ *(SSchJob **)job = NULL;
+
+ scheduleFreeJob(pJob);
+
+ SCH_RET(code);
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
- qError("scheduler already init");
+ qError("scheduler already initialized");
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (cfg) {
schMgmt.cfg = *cfg;
- if (schMgmt.cfg.maxJobNum <= 0) {
+ if (schMgmt.cfg.maxJobNum == 0) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
} else {
@@ -905,79 +1078,18 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.jobs) {
- SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
- }
-
- schMgmt.sId = schGenSchId();
-
- return TSDB_CODE_SUCCESS;
-}
-
-
-int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
- if (nodeList && taosArrayGetSize(nodeList) <= 0) {
- qInfo("qnodeList is empty");
- }
-
- int32_t code = 0;
- SSchJob *job = calloc(1, sizeof(SSchJob));
- if (NULL == job) {
+ qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
- job->attr.syncSchedule = syncSchedule;
- job->transport = transport;
- job->nodeList = nodeList;
-
- SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
-
- job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
- if (NULL == job->execTasks) {
- qError("taosHashInit %d failed", pDag->numOfSubplans);
- SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
+ if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
+ qError("generate schdulerId failed, errno:%d", errno);
+ SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
}
- job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
- if (NULL == job->succTasks) {
- qError("taosHashInit %d failed", pDag->numOfSubplans);
- SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
- }
-
- job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
- if (NULL == job->failTasks) {
- qError("taosHashInit %d failed", pDag->numOfSubplans);
- SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
- }
-
- tsem_init(&job->rspSem, 0, 0);
-
- code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
- if (0 != code) {
- if (HASH_NODE_EXIST(code)) {
- qError("taosHashPut queryId:0x%"PRIx64" already exist", job->queryId);
- SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
- } else {
- qError("taosHashPut queryId:0x%"PRIx64" failed", job->queryId);
- SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
- }
- }
-
- job->status = JOB_TASK_STATUS_NOT_START;
- SCH_ERR_JRET(schLaunchJob(job));
-
- *(SSchJob **)pJob = job;
-
- if (syncSchedule) {
- tsem_wait(&job->rspSem);
- }
-
- return TSDB_CODE_SUCCESS;
-
-_return:
- *(SSchJob **)pJob = NULL;
- scheduleFreeJob(job);
+ qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);
- SCH_RET(code);
+ return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
@@ -985,11 +1097,11 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
- SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true));
+ SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
SSchJob *job = *(SSchJob **)pJob;
- pRes->code = job->errCode;
+ pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
return TSDB_CODE_SUCCESS;
@@ -1000,7 +1112,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
- return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false);
+ return schExecJobImpl(transport, nodeList, pDag, pJob, false);
}
@@ -1012,14 +1124,14 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
SSchJob *job = pJob;
int32_t code = 0;
- if (!job->attr.needFetch) {
+ if (!SCH_JOB_NEED_FETCH(&job->attr)) {
qError("no need to fetch data");
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (job->status == JOB_TASK_STATUS_FAILED) {
job->res = NULL;
- SCH_RET(job->errCode);
+ SCH_RET(atomic_load_32(&job->errCode));
}
if (job->status == JOB_TASK_STATUS_SUCCEED) {
@@ -1039,7 +1151,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
tsem_wait(&job->rspSem);
if (job->status == JOB_TASK_STATUS_FAILED) {
- code = job->errCode;
+ code = atomic_load_32(&job->errCode);
}
if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {
@@ -1091,7 +1203,7 @@ void scheduleFreeJob(void *pJob) {
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
- cleanupTask(pTask);
+ schFreeTask(pTask);
}
taosArrayDestroy(pLevel->subTasks);
diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c
index 75f5e9cdbc..5cb149d53c 100644
--- a/source/util/src/tqueue.c
+++ b/source/util/src/tqueue.c
@@ -112,6 +112,13 @@ bool taosQueueEmpty(STaosQueue *queue) {
return empty;
}
+int32_t taosQueueSize(STaosQueue *queue) {
+ pthread_mutex_lock(&queue->mutex);
+ int32_t numOfItems = queue->numOfItems;
+ pthread_mutex_unlock(&queue->mutex);
+ return numOfItems;
+}
+
void *taosAllocateQitem(int32_t size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);