/**
* Vertebrae Inc
* @package Cassandra-ORM
* @exports MaterialView
*/
"use strict";
const Cassandra = require('./cassandra');
const Model = Cassandra.Model;
const format = require('util').format;
const async = require('async');
const BaseModel = Model.BaseModel;
const AbstractModel = Model.AbstractModel;
const AbstractModelPrototype = AbstractModel.prototype;
const abstractMethods = Object.getOwnPropertyNames(AbstractModelPrototype);
const arrayUnique = (val, index, self) => {
return self.indexOf(val) === index;
};
/**
* Create a new Cassandra ColumnFamily attached to a {@link Cassandra.Schema} on the named table and creating the table if it doesn't exist. In general, you should use the instanced method {@link Cassandra#model} to attach models
* @memberof Cassandra.Model
* @param {object} model - the parent model the view will be based off of
* @param {string} name - the materialized view's name, will become the name of the column family
*/
class ColumnFamily extends BaseModel {
constructor(db, name, schema) {
super(db, name, schema);
var model = this;
model.views = {};
model.indexes = {};
//instance Factory
model.Factory = class extends Model.ModelInstance {
constructor(object, bypass) {
super(object, model, bypass);
}
static get name() {
return model.name;
}
static get views() {
return model.views;
}
static get model() {
return model;
}
};
//mixin abstract model
for (let method of abstractMethods) {
if (method === 'constructor') {
continue;
}
//expose ORM specific methods
Object.defineProperty(model.Factory, method, {
enumerable: true,
value: function () {
return AbstractModelPrototype[method].apply(model, arguments);
}
});
}
//mixin statics in both instance and class
for (let method in schema.statics) {
let staticMethod = schema.statics[method];
model[method] = staticMethod;
model.Factory[method] = function () {
staticMethod.apply(model, arguments);
};
}
}
_buildSchema(callback) {
var model = this;
var cassandra = model.db;
if (model.__$$built) {
throw new Error('This should not be overriden!');
}
model.__$$built = true;
if (cassandra.connected) {
async.series([
(next) => model._createTable(next),
(next) => model._createViews(next),
(next) => model._createIndexes(next)
], (err) => {
if (typeof callback === 'function') {
callback(err);
}
});
} else {
model._createTable();
model._createViews();
model._createIndexes();
}
}
/**
* Creates the table specified by the {@link Cassandra.Model} constructor
* @param {function} callback - receives err, result
*/
_createTable(done) {
var model = this;
var schema = model.schema;
var dataModel = schema.model;
var collections = schema.collections;
var compaction = schema.options.compaction;
var cassandra = model.db;
var orderBy = schema.options.orderBy;
var options = [];
var joinColumnTypes = (column) => {
if (collections[column]) {
let collection = collections[column];
return format(
'%s %s<%s>',
column,
schema.model[column],
dataModel[column] === 'map' ? collection.join(', ') : collection
);
}
return column + ' ' + schema.model[column];
};
var query = format(
'CREATE TABLE IF NOT EXISTS %s.%s (%s, %s)',
cassandra.keyspace,
model.name,
schema.columns.map(joinColumnTypes).join(', '),
model._createPartitionKeyQuery(model.primaryKeys)
);
if (orderBy) {
options.push(format(
'CLUSTERING ORDER BY (%s)',
Object.keys(orderBy).map((field) => {
return field + ' ' + orderBy[field];
})
));
}
if (compaction) {
options.push('compaction = ' + JSON.stringify(compaction).replace(/"/g, "'"));
}
if (options.length) {
query += ' WITH ' + options.join(' AND ');
}
if (cassandra.connected) {
cassandra.driver.execute(query, done);
} else {
cassandra.queue.push((next) => cassandra.driver.execute(query, next));
}
}
/**
* Creates the materialized views specified by the {@link Cassandra.Schema} options
* @param {function} callback - receives err, result
*/
_createViews(done) {
var model = this;
var schema = model.schema;
var views = schema.options.views;
var cassandra = model.db;
if (!views) {
if (typeof done === 'function') {
done();
}
return;
}
var batch = [];
for (let viewName in views) {
batch.push(model.createView(viewName, views[viewName], false, true));
}
if (cassandra.connected) {
async.each(batch, (query, next) => cassandra.driver.execute(query, next), done);
} else {
cassandra.queue.push((next) => {
async.each(batch, (query, cb) => cassandra.driver.execute(query, cb), next);
});
}
}
_createIndexes(done) {
var model = this;
var indexes = model.schema.options.indexes;
var cassandra = model.db;
if (!indexes || !indexes.length) {
if (typeof done === 'function') {
done();
}
return;
}
var batch = [];
for (let index of indexes) {
batch.push(model.createIndex(index, false, true));
}
if (cassandra.connected) {
async.each(batch, (query, next) => cassandra.driver.execute(query, next), done);
} else {
cassandra.queue.push((next) => {
async.each(batch, (query, cb) => cassandra.driver.execute(query, cb), next);
});
}
}
/**
* Create an Index on the current table
* @param {string|object} index - a string of the index name that will be automatically mapped, or an object with a single key as the index who's value is the custom index name to be stored in the database.
* @param {function} done - receives err, result
* @example
* createIndex('myIndex', ...); // translates to <table>_myindex_idx
* createIndex({myIndex: 'custom_index_mapping'}, ...); // does not get translated
*/
createIndex(index, done) {
var model = this;
var query = '';
var cassandra = model.db;
var dataModel = model.schema.model;
var keyspace = cassandra.keyspace;
//string
if (index.length) {
if (undefined === dataModel[index]) {
throw new Error(format('Invalid Index, could not find column "%s" in table model "%s"', index, model.name));
}
model.indexes[index] = index;
query = format(
'CREATE INDEX IF NOT EXISTS %s_%s_idx ON %s.%s (%s)',
model.name,
index,
keyspace,
model.name,
dataModel[index] === 'map' ? format('KEYS(%s)', index) : index
);
} else {
let indexKey = Object.keys(index)[0];
let indexName = index[indexKey];
if (undefined === dataModel[indexKey]) {
throw new Error(format('Invalid Index, could not find column "%s" in table model "%s"', indexKey, model.name));
}
model.indexes[index] = index;
model.indexes[indexKey] = indexName;
query = format(
'CREATE INDEX IF NOT EXISTS %s ON %s.%s (%s)',
indexName,
keyspace,
model.name,
dataModel[indexKey] === 'map' ? format('KEYS(%s)', indexKey) : indexKey
);
}
if (false === done) {
return query;
} else if (cassandra.connected) {
cassandra.driver.execute(query, done);
} else {
cassandra.queue.push((next) => cassandra.driver.execute(query, next));
}
}
/**
* Creates a new materialized view and attachs it to the model {@link Cassandra.Model#views}
* The create query that is generated will mixin any missing primary keys from the model,
* this means if your model has a composite key [a,b] and your view sets a composite key of
* [[c,d],e
* @param {string} viewName - the name of the current view
* @param {object} config - the views configuration object
* @param {function|boolean} callback|querystring - if callback, receieves err, result. Else if "false" will return querystring, this is strict
* @param {boolean} noQualify - do not attempt to qualify the configuration
*/
createView(viewName, config, done, noQualify) {
var view = config;
var model = this;
var cassandra = model.db;
if (model.views[viewName]) {
let error = new Error('View already exists! ' + model.name + ' - ' + viewName);
if (typeof done === 'function') {
return done(error);
} else {
throw error;
}
}
model.views[viewName] = new Model.MaterializedView(model, viewName, config);
if (!noQualify) {
model.schema.qualifyView(viewName, config);
}
//Mixin model's primary keys
var primaryKeys = Array.isArray(view.primaryKeys[0])
? view.primaryKeys[0].concat(view.primaryKeys.slice(1))
: view.primaryKeys.slice(0);
var modelPrimaryKeys = Array.isArray(model.primaryKeys[0])
? model.primaryKeys[0].concat(model.primaryKeys.slice(1))
: model.primaryKeys.slice(0);
modelPrimaryKeys.forEach((key) => {
if (primaryKeys.indexOf(key) === -1) {
view.primaryKeys.push(key);
primaryKeys.push(key);
}
});
let where = primaryKeys.map((column) => {
return column + ' IS NOT NULL';
});
let query = format(
'CREATE MATERIALIZED VIEW IF NOT EXISTS %s.%s '
+ 'AS SELECT %s FROM %s WHERE %s %s',
cassandra.keyspace,
model.views[viewName].qualifiedName,
view.select
? view.select.concat(view.primaryKeys).filter(arrayUnique).join(', ')
: view.primaryKeys.join(', '),
model.name,
where.join(' AND '),
model._createPartitionKeyQuery(view.primaryKeys)
);
if (view.orderBy) {
query += format(
' WITH CLUSTERING ORDER BY (%s)',
Object.keys(view.orderBy).map((field) => {
return field + ' ' + view.orderBy[field];
})
);
}
if (false === done) {
return query;
} else if (cassandra.connected) {
cassandra.driver.execute(query, done);
} else {
cassandra.queue.push((next) => cassandra.driver.execute(query, next));
}
}
}
module.exports = ColumnFamily;