Source code file content
mercurial / aura / src / com / sun / labs / aura / datastore / impl / store / BerkeleyDataWrapper.java
Size: 51806 bytes, 1 line
/*
* Copyright 2007-2009 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER
*
* This code is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
* only, as published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License version 2 for more details (a copy is
* included in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
* version 2 along with this work; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA
*
* Please contact Sun Microsystems, Inc., 16 Network Circle, Menlo
* Park, CA 94025 or visit www.sun.com if you need additional
* information or have any questions.
*/
package com.sun.labs.aura.datastore.impl.store;
import com.sleepycat.persist.evolve.EvolveEvent;
import com.sun.labs.aura.datastore.impl.store.persist.FieldDescription;
import com.sleepycat.je.CursorConfig;
import com.sun.labs.aura.datastore.DBIterator;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DeadlockException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentStats;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityIndex;
import com.sleepycat.persist.EntityJoin;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.ForwardCursor;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.SecondaryIndex;
import com.sleepycat.persist.StoreConfig;
import com.sleepycat.persist.evolve.EvolveConfig;
import com.sleepycat.persist.evolve.EvolveListener;
import com.sleepycat.persist.evolve.EvolveStats;
import com.sleepycat.persist.evolve.Mutations;
import com.sleepycat.persist.model.AnnotationModel;
import com.sleepycat.persist.model.EntityModel;
import com.sun.labs.aura.util.AuraException;
import com.sun.labs.aura.datastore.Attention;
import com.sun.labs.aura.datastore.AttentionConfig;
import com.sun.labs.aura.datastore.Item;
import com.sun.labs.aura.datastore.Item.ItemType;
import com.sun.labs.aura.datastore.impl.Util;
import com.sun.labs.aura.datastore.impl.store.persist.PersistentAttention;
import com.sun.labs.aura.datastore.impl.store.persist.IntAndTimeKey;
import com.sun.labs.aura.datastore.impl.store.persist.UserImpl;
import com.sun.labs.aura.datastore.impl.store.persist.ItemImpl;
import com.sun.labs.aura.datastore.impl.store.persist.StringAndTimeKey;
import com.sun.labs.aura.util.Times;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.File;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Provides a wrapper around all the berkeley DB iteractions to isolate it
* from other logic in the item store.
*/
public class BerkeleyDataWrapper {
/**
* The max number of times to retry a deadlocked transaction before
* admitting failure.
*/
protected final static int MAX_DEADLOCK_RETRIES = 10;
/**
* The actual database environment.
*/
protected Environment dbEnv;
/**
* The store inside the environment where all our indexes will live
*/
protected EntityStore store;
/**
* The index of all field descriptions in the store, accessible by field
* name.
*/
protected PrimaryIndex<String, FieldDescription> fieldByName;
/**
* The index of all Items in the store, accessible by key
*/
protected PrimaryIndex<String, ItemImpl> itemByKey;
/**
* The index of all Items in the store, accessible by type
*/
protected SecondaryIndex<Integer, String, ItemImpl> itemByType;
protected SecondaryIndex<IntAndTimeKey, String, ItemImpl> itemByTypeAndTime;
/**
* A subset of the item index that only indexes users
*/
protected SecondaryIndex<Boolean, String, UserImpl> allUsers;
/**
* Only users, indexed by the random string associated with them
*/
protected SecondaryIndex<String, String, UserImpl> usersByRandString;
/**
* The index of all Attention in the item store, accessible by ID
*/
protected PrimaryIndex<Long, PersistentAttention> allAttn;
/**
* The index of all Attention in the item store, accessible by
* the associated item
*/
protected SecondaryIndex<String, Long, PersistentAttention> attnByTargetKey;
/**
* The index of all Attention in the item store, accessible by
* the associated user
*/
protected SecondaryIndex<String, Long, PersistentAttention> attnBySourceKey;
/**
* The index of all Attention in the item store, accessible by
* the type of attention
*/
protected SecondaryIndex<Integer, Long, PersistentAttention> attnByType;
/**
* The index of all Attention, accessible by the timestamp
*/
protected SecondaryIndex<Long, Long, PersistentAttention> attnByTime;
/**
* The index of all Attention in the item store, accessible by
* the composite key of source ID and timestamp.
*/
protected SecondaryIndex<StringAndTimeKey, Long, PersistentAttention> attnBySourceAndTime;
/**
* The index of all Attention in the item store, accessible by
* the composite key of target ID and timestamp
*/
protected SecondaryIndex<StringAndTimeKey, Long, PersistentAttention> attnByTargetAndTime;
/**
* The index of all Attention in the item store, accessible by
* the meta-data string provided by the user
*/
protected SecondaryIndex<String, Long, PersistentAttention> attnByStringVal;
/**
* The index of all Attention in the item store, accessible by
* the meta-data number provided by the user
*/
protected SecondaryIndex<Long, Long, PersistentAttention> attnByNumberVal;
protected Map<String,FieldDescription> fields;
protected Logger log;
/**
* Constructs a database wrapper.
*
* @param dbEnvDir the environment directory for the database
* @param logger a logger to use for messages
* @throws com.sleepycat.je.DatabaseException
*/
public BerkeleyDataWrapper(String dbEnvDir, Logger logger)
throws DatabaseException {
this(dbEnvDir, logger, 60);
}
/**
* Constructs a database wrapper.
*
* @param dbEnvDir the environment directory for the database
* @param logger a logger to use for messages
* @param overwrite true if an existing database should be overwritten
* @throws com.sleepycat.je.DatabaseException
*/
public BerkeleyDataWrapper(String dbEnvDir,
Logger logger,
int cacheSizeMemPercentage)
throws DatabaseException {
this.log = logger;
EnvironmentConfig econf = new EnvironmentConfig();
StoreConfig sconf = new StoreConfig();
econf.setAllowCreate(true);
econf.setTransactional(true);
econf.setCachePercent(cacheSizeMemPercentage);
//
// Set up any mutations -- object version changes
Mutations mutations = new Mutations();
ItemImpl.addMutations(mutations);
sconf.setMutations(mutations);
sconf.setAllowCreate(true);
sconf.setTransactional(true);
econf.setConfigParam(EnvironmentConfig.TXN_DUMP_LOCKS, "true");
//
// Code from Mark Hayes to register persist subclasses -- this is
// a potential work-around to our corruption issue.
EntityModel model = new AnnotationModel();
// register all entity subclasses
model.registerClass(UserImpl.class);
// set the model and create the store
sconf.setModel(model);
File dir = new File(dbEnvDir);
if(!dir.exists()) {
if (!dir.mkdirs()) {
log.severe("Failed to make DB home dir!");
}
}
log.info("BDB opening DB Env...");
dbEnv = new Environment(dir, econf);
log.info("BDB opening Store...");
store = new EntityStore(dbEnv, "Aura", sconf);
//
// Load the indexes that we'll use during regular operation
//itemByID = store.getPrimaryIndex(Long.class, ItemImpl.class);
logger.fine("Opening fieldByName");
fieldByName = store.getPrimaryIndex(String.class, FieldDescription.class);
//itemByKey = store.getSecondaryIndex(itemByID, String.class, "key");
logger.fine("Opening itemByKey");
itemByKey = store.getPrimaryIndex(String.class, ItemImpl.class);
logger.fine("Opening itemByType");
itemByType = store.getSecondaryIndex(itemByKey,
Integer.class,
"itemType");
logger.fine("Opening itemByTypeAndTime");
itemByTypeAndTime = store.getSecondaryIndex(itemByKey,
IntAndTimeKey.class,
"typeAndTimeAdded");
logger.fine("Opening allUsers");
allUsers = store.getSubclassIndex(itemByKey, UserImpl.class,
Boolean.class, "isUser");
logger.fine("Opening usersByRandString");
usersByRandString = store.getSubclassIndex(itemByKey, UserImpl.class,
String.class, "randStr");
logger.fine("Opening allAttn");
allAttn = store.getPrimaryIndex(Long.class,
PersistentAttention.class);
logger.fine("Opening attnByTargetKey");
attnByTargetKey = store.getSecondaryIndex(allAttn,
String.class,
"targetKey");
logger.fine("Opening attnBySourceKey");
attnBySourceKey = store.getSecondaryIndex(allAttn,
String.class,
"sourceKey");
logger.fine("Opening attnByType");
attnByType = store.getSecondaryIndex(allAttn,
Integer.class,
"type");
logger.fine("Opening attnByTime");
attnByTime = store.getSecondaryIndex(allAttn, Long.class, "timeStamp");
logger.fine("Opening attnBySourceAndTime");
attnBySourceAndTime = store.getSecondaryIndex(allAttn,
StringAndTimeKey.class,
"sourceAndTime");
logger.fine("Opening attnByTargetAndTime");
attnByTargetAndTime = store.getSecondaryIndex(allAttn,
StringAndTimeKey.class,
"targetAndTime");
logger.fine("Opening attnByStringVal");
attnByStringVal = store.getSecondaryIndex(allAttn,
String.class,
"metaString");
logger.fine("Opening attnByNumberVal");
attnByNumberVal = store.getSecondaryIndex(allAttn,
Long.class,
"metaLong");
//
// Did we have our invalid attention item stored to keep all 'columns'
// populated with at least one value?
PersistentAttention bad = PersistentAttention.INVALID_ATTN;
AttentionConfig ac = new AttentionConfig();
ac.setSourceKey(bad.getSourceKey());
ac.setTargetKey(bad.getTargetKey());
long cnt = getAttentionCount(ac);
if (cnt < 1) {
try {
logger.fine("Storing the all-fields-filled attention");
putAttention(bad);
} catch (AuraException e) {
logger.fine("Storing the-all-fields-filled attention failed!");
}
}
//
// Start evolving anything that needs evolving
Runnable evolver = new Runnable() {
@Override
public void run() {
log.info("Starting store evolution");
try {
EvolveConfig evconf = new EvolveConfig();
evconf.addClassToEvolve(ItemImpl.class.getName());
//
// Set an evolve listener so that we get stats on the evolve
evconf.setEvolveListener(new EvolveListener() {
@Override
public boolean evolveProgress(EvolveEvent event) {
return true;
}
});
EvolveStats stats = store.evolve(evconf);
log.info("Read " + stats.getNRead() +
" and converted " + stats.getNConverted());
} catch (DatabaseException e) {
log.log(Level.WARNING, "Evolving all objects failed", e);
}
log.info("Done evolving store.");
}
};
Thread t = new Thread(evolver);
t.start();
log.info("BDB done loading");
}
public void defineField(String fieldName,
Item.FieldType fieldType,
EnumSet<Item.FieldCapability> caps) throws AuraException {
try {
FieldDescription fd =
new FieldDescription(fieldName, fieldType, caps);
FieldDescription prev = fieldByName.get(fieldName);
if(prev != null) {
if(!prev.equals(fd)) {
throw new AuraException("Attempt to redefined field " + fieldName +
" using different capabilities or type prev: " +
prev.getCapabilities() + " " + prev.getType() +
" new: " + fd.getCapabilities() + " " + fd.getType());
}
} else {
int numRetries = 0;
while(numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
txn = dbEnv.beginTransaction(null, null);
fieldByName.put(txn, fd);
txn.commit();
return;
} catch(DeadlockException e) {
try {
txn.abort();
numRetries++;
} catch(DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
} catch(Exception e) {
try {
if(txn != null) {
txn.abort();
}
} catch(DatabaseException ex) {
}
throw new AuraException("putItem transaction failed", e);
}
}
throw new AuraException("defineField failed for " + fieldName);
}
} catch(DatabaseException ex) {
throw new AuraException("defineField failed getting field description", ex);
}
}
public Map<String,FieldDescription> getFieldDescriptions() {
return new HashMap(fieldByName.map());
}
/**
* Gets the set of all items of a particular type. This could be a large
* set.
*
* @return all users in the item store
*/
public List<Item> getAll(Item.ItemType type) {
List<Item> items = new ArrayList<Item>();
try {
EntityIndex index = itemByType.subIndex(type.ordinal());
EntityCursor<ItemImpl> cur = index.entities();
try {
for(ItemImpl i : cur) {
items.add(i);
}
} finally {
cur.close();
}
} catch(DatabaseException e) {
log.log(Level.WARNING, "Failed to retrieve users", e);
}
return items;
}
public DBIterator<Item> getAllIterator(Item.ItemType type)
throws AuraException {
EntityCursor cur = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
txn.setTxnTimeout(0);
if (type != null) {
EntityIndex index = itemByType.subIndex(type.ordinal());
cur = index.entities(txn, CursorConfig.READ_UNCOMMITTED);
} else {
cur = itemByKey.entities(txn, CursorConfig.READ_UNCOMMITTED);
}
} catch(DatabaseException e) {
handleCursorException(cur, txn, e);
}
DBIterator<Item> dbIt = new EntityIterator<Item>(cur, txn);
return dbIt;
}
/**
* Gets an item from the entity store
* @param key the key of the item to fetch
* @return the item or null if the key is unknown
*/
public ItemImpl getItem(String key) {
ItemImpl ret = null;
try {
ret = itemByKey.get(null, key, LockMode.READ_UNCOMMITTED);
} catch(DatabaseException e) {
log.log(Level.WARNING, "getItem() failed to retrieve item (key:" +
key + ")", e);
}
return ret;
}
/**
* Puts an item into the entity store. If the item already exists, it will
* be replaced.
*
* @param item the item to put
* @return the existing entity that was updated or null of the item was
* inserted
*/
public ItemImpl putItem(ItemImpl item) throws AuraException {
ItemImpl ret = null;
int numRetries = 0;
while(numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
txn = dbEnv.beginTransaction(null, null);
ret = itemByKey.put(txn, item);
txn.commit();
return ret;
} catch(DeadlockException e) {
try {
txn.abort();
log.finest("Deadlock detected in putting " + item.getKey() + ": " + e.getMessage());
numRetries++;
} catch(DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
} catch(Exception e) {
try {
if(txn != null) {
txn.abort();
}
} catch(DatabaseException ex) {
}
throw new AuraException("putItem transaction failed", e);
}
}
throw new AuraException("putItem failed for " +
item.getType().toString() + ":" + item.getKey() +
" after " + numRetries + " retries");
}
/**
* Deletes an item from the database.
*
* @param itemKey the key of the item to delete
* @throws com.sun.labs.aura.util.AuraException
*/
public void deleteItem(String itemKey) throws AuraException {
int numRetries = 0;
while (numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
txn = dbEnv.beginTransaction(null, null);
itemByKey.delete(itemKey);
txn.commit();
return;
} catch (DeadlockException e) {
try {
txn.abort();
numRetries++;
} catch (DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
} catch (Exception e) {
try {
if (txn != null) {
txn.abort();
}
} catch (DatabaseException ex) {
}
throw new AuraException("deleteItem transaction failed", e);
}
}
throw new AuraException("deleteItem failed for " +
itemKey + " after " + numRetries + " retries");
}
public void deleteAttention(List<Long> ids) throws AuraException {
int numRetries = 0;
while (numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
txn = dbEnv.beginTransaction(null, null);
for (Long id : ids) {
allAttn.delete(id);
}
txn.commit();
return;
} catch (DeadlockException e) {
try {
txn.abort();
numRetries++;
} catch (DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
} catch (Exception e) {
try {
if (txn != null) {
txn.abort();
}
} catch (DatabaseException ex) {
}
throw new AuraException("deleteItem transaction failed", e);
}
}
throw new AuraException("deleteAttention failed after " +
numRetries + " retries");
}
public DBIterator<ItemImpl> getItemIterator() throws AuraException {
EntityCursor c = null;
DBIterator<ItemImpl> i = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
c = itemByKey.entities(txn, CursorConfig.READ_UNCOMMITTED);
i = new EntityIterator<ItemImpl>(c, txn);
} catch (DatabaseException e) {
handleCursorException(c, txn, e);
}
return i;
}
public UserImpl getUserForRandomString(String randStr) throws AuraException {
UserImpl ret = null;
try {
ret = usersByRandString.get(null, randStr, LockMode.READ_UNCOMMITTED);
} catch(DatabaseException e) {
log.log(Level.WARNING, "getUserForRandomString() failed (randStr:" +
randStr + ")", e);
}
return ret;
}
/**
* Puts an attention into the entry store. Attentions should never be
* overwritten.
*
* @param pa the attention
*/
public void putAttention(PersistentAttention pa) throws AuraException {
int numRetries = 0;
while(numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
TransactionConfig txConf = new TransactionConfig();
txConf.setWriteNoSync(true);
txn = dbEnv.beginTransaction(null, txConf);
long prevID = pa.getID();
if (!allAttn.putNoOverwrite(txn, pa)) {
log.warning("Failed to insert attention since primary key already exists: " + pa);
try {
throw new AuraException("Failed to insert attention. PrevID was " + prevID);
} catch (AuraException e) {
log.log(Level.WARNING, "", e);
}
}
txn.commit();
return;
} catch(DeadlockException e) {
try {
txn.abort();
numRetries++;
} catch(DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
} catch(DatabaseException e) {
try {
if (txn != null) {
txn.abort();
}
} catch(DatabaseException ex) {
}
throw new AuraException("Transaction failed", e);
}
}
throw new AuraException("putAttn failed for " + pa + " after " +
numRetries + " retries");
}
/**
* Puts attention into the entry store. Attentions should never be
* overwritten.
*
* @param pa the attention
*/
public void putAttention(List<PersistentAttention> pas) throws AuraException {
int numRetries = 0;
while(numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
TransactionConfig txConf = new TransactionConfig();
txConf.setWriteNoSync(true);
txn = dbEnv.beginTransaction(null, txConf);
for (PersistentAttention pa : pas) {
allAttn.putNoOverwrite(txn, pa);
}
txn.commit();
return;
} catch(DeadlockException e) {
try {
txn.abort();
numRetries++;
} catch(DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
} catch(DatabaseException e) {
try {
if (txn != null) {
txn.abort();
}
} catch(DatabaseException ex) {
}
throw new AuraException("Transaction failed", e);
}
}
throw new AuraException("putAttns failed for <list> after " +
numRetries + " retries");
}
/**
* Delete all the attention that has as a source or target the given item.
* This should be used to clean up after an item was deleted.
*
* @param itemKey the key of the item
* @throws com.sun.labs.aura.util.AuraException
*/
public void removeAttention(String itemKey)
throws AuraException {
int numRetries = 0;
while(numRetries < MAX_DEADLOCK_RETRIES) {
//
// Get all the attentions for which this item was a source and delete
Transaction txn = null;
try {
//
// Get all the attentions for which this item was a source
// and delete them
EntityIndex<Long, PersistentAttention> attns =
attnBySourceKey.subIndex(itemKey);
txn = dbEnv.beginTransaction(null, null);
EntityCursor<PersistentAttention> c = attns.entities(txn, new CursorConfig());
try {
for(PersistentAttention a : c) {
c.delete();
}
} finally {
if(c != null) {
c.close();
}
}
//
// Now do the same, but for attention where itemKey was the target
attns = attnByTargetKey.subIndex(itemKey);
c = attns.entities();
try {
for(PersistentAttention a : c) {
c.delete();
}
} finally {
if(c != null) {
c.close();
}
}
txn.commit();
return;
} catch (DeadlockException ex) {
try {
txn.abort();
numRetries++;
} catch(DatabaseException dex) {
throw new AuraException("Txn abort failed", dex);
}
} catch(DatabaseException ex) {
log.log(Level.WARNING, "Failed to delete attention related to "
+ itemKey, ex);
try {
if (txn != null) {
txn.abort();
}
} catch(DatabaseException dex) {
}
throw new AuraException("Transaction failed", ex);
}
}
throw new AuraException("deleteAttn failed for item " + itemKey
+ " after " + numRetries + " retries");
}
public void removeAttention(String srcKey, String targetKey,
Attention.Type type)
throws AuraException {
//
// Get all matching attention and remove it
EntityJoin<Long, PersistentAttention> join = new EntityJoin(allAttn);
join.addCondition(attnBySourceKey, srcKey);
join.addCondition(attnByTargetKey, targetKey);
join.addCondition(attnByType, type.ordinal());
int numRetries = 0;
while(numRetries < MAX_DEADLOCK_RETRIES) {
Transaction txn = null;
try {
//
// Get all the attention IDs that match the criteria
List<Long> attnIDs = new ArrayList();
ForwardCursor<PersistentAttention> cur = null;
try {
cur = join.entities();
for(PersistentAttention attn : cur) {
attnIDs.add(attn.getID());
}
} finally {
if(cur != null) {
cur.close();
}
}
//
// And delete them
try {
txn = dbEnv.beginTransaction(null, null);
for (Long id : attnIDs) {
allAttn.delete(txn, id);
}
txn.commit();
return;
} catch (DeadlockException e) {
try {
numRetries++;
txn.abort();
} catch (DatabaseException ex) {
throw new AuraException("Txn abort failed", ex);
}
}
} catch (DatabaseException e) {
log.log(Level.WARNING, "Failed to remove Attention", e);
try {
if (txn != null) {
txn.abort();
}
} catch (DatabaseException ex) {
}
throw new AuraException("Remove attention failed", e);
}
}
throw new AuraException("removeAttn failed for src " + srcKey +
" and tgt " + targetKey + " after " + numRetries + " retries");
}
/**
* Gets all the items of a particular type that have been added since a
* particular time. Returns an iterator over those items that must be
* closed when reading is done.
*
* @param itemType the type of item to retrieve
* @param timeStamp the time from which to search (to the present time
* @return an iterator over the added items
* @throws com.sun.labs.aura.util.AuraException
*/
public DBIterator<Item> getItemsAddedSince(ItemType itemType,
long timeStamp) throws AuraException {
//
// We need to get a cursor based on the long & time key that stores
// the item type and the time that it was added. We'll get a cursor
// for only that type and the range of times from the provided
// timestamp to the current time.
IntAndTimeKey begin = new IntAndTimeKey(itemType.ordinal(), timeStamp);
IntAndTimeKey end = new IntAndTimeKey(itemType.ordinal(),
System.currentTimeMillis());
EntityCursor cursor = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
//
// This transaction is read-only and it is up to the developer
// to release it. Don't time out the transaction.
txn.setTxnTimeout(0);
//
// Set Read Committed behavior - this ensures the stability of
// the current item being read (puts a read lock on it) but allows
// previously read items to change (releases the read lock after
// reading).
CursorConfig cc = new CursorConfig();
cc.setReadCommitted(true);
cursor = itemByTypeAndTime.entities(txn,
begin, true,
end, true,
cc);
//try {
// cursor.next();
// cursor.prev();
//} catch (IllegalStateException e) {
// return new EntityIterator();
//}
//cursor.prev();
} catch(DatabaseException e) {
handleCursorException(cursor, txn, e);
}
DBIterator<Item> dbIt = new EntityIterator<Item>(cursor, txn);
return dbIt;
}
/**
* Get items with a particular user id, attn type, and item type (answers
* the query: Get me all the items of this type that this user has paid
* this kind of attention to)
*
* @param userID the id of the user
* @param attnType the type of attention
* @param itemType the type of the item
* @return the set of matching items
*/
@Deprecated
public List<Item> getItems(
String userKey,
Attention.Type attnType,
ItemType itemType) throws AuraException {
List<Item> result = new ArrayList<Item>();
//
// First get all the attention of the particular type with the
// particular user
AttentionConfig ac = new AttentionConfig();
ac.setSourceKey(userKey);
ac.setType(attnType);
DBIterator<Attention> attn = getAttentionIterator(ac);
try {
while (attn.hasNext()) {
//
// Now do the in-memory join, looking up each item as we go
Attention a = attn.next();
ItemImpl item = getItem(a.getTargetKey());
if(item.getType() == itemType) {
result.add(item);
}
}
} catch (RemoteException e) {
throw new AuraException("Remote exception on local object!!", e);
} finally {
try {
attn.close();
} catch (RemoteException e) {
}
}
return result;
}
/**
* Gets all the attention that has been added to the store since a
* particular date. Returns an iterator over the attention that must be
* closed when reading is done.
*
* @param timeStamp the time to search back to
* @return the Attentions added since that time
* @throws com.sun.labs.aura.util.AuraException
*/
@SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
justification="Future-proofing isn't bad")
public DBIterator<Attention> getAttentionAddedSince(long timeStamp)
throws AuraException {
EntityCursor c = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
//
// This transaction is read-only and it is up to the developer
// to release it. Don't time out the transaction.
txn.setTxnTimeout(0);
//
// Set Read Committed behavior - this ensures the stability of
// the current item being read (puts a read lock on it) but allows
// previously read items to change (releases the read lock after
// reading).
CursorConfig cc = new CursorConfig();
cc.setReadCommitted(true);
c = attnByTime.entities(txn, timeStamp, true,
System.currentTimeMillis(), true, cc);
} catch(DatabaseException e) {
try {
if(c != null) {
c.close();
}
} catch(DatabaseException ex) {
log.log(Level.WARNING, "Failed to close cursor", ex);
}
try {
if(txn != null) {
txn.abort();
}
} catch(DatabaseException ex) {
log.log(Level.WARNING, "Failed to abort cursor txn", ex);
}
throw new AuraException("getAttentionAddedSince failed", e);
}
DBIterator<Attention> dbIt = new EntityIterator<Attention>(c, txn);
return dbIt;
}
protected EntityJoin<Long, PersistentAttention> getAttentionJoin(
AttentionConfig ac) {
EntityJoin<Long, PersistentAttention> join = new EntityJoin(allAttn);
if (ac.getSourceKey() != null) {
join.addCondition(attnBySourceKey, ac.getSourceKey());
}
if (ac.getTargetKey() != null) {
join.addCondition(attnByTargetKey, ac.getTargetKey());
}
if (ac.getType() != null) {
join.addCondition(attnByType, ac.getType().ordinal());
}
if (ac.getStringVal() != null) {
join.addCondition(attnByStringVal, ac.getStringVal());
}
if (ac.getNumberVal() != null) {
join.addCondition(attnByNumberVal, ac.getNumberVal());
}
return join;
}
public DBIterator<Attention> getAttentionIterator(AttentionConfig ac)
throws AuraException {
EntityJoin<Long,PersistentAttention> join = null;
if (!Util.isEmpty(ac)) {
join = getAttentionJoin(ac);
}
ForwardCursor cur = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
txn.setTxnTimeout(0);
if (!Util.isEmpty(ac)) {
cur = join.entities(txn, CursorConfig.READ_UNCOMMITTED);
} else {
cur = allAttn.entities(txn, CursorConfig.READ_UNCOMMITTED);
}
} catch(DatabaseException e) {
handleCursorException(cur, txn, e);
}
DBIterator<Attention> dbIt = new EntityIterator<Attention>(cur, txn);
return dbIt;
}
/**
* Returns a count of attention meeting the given criteria
*
* @param ac
* @return
*/
public Long getAttentionCount(AttentionConfig ac) {
if (ac == null || Util.isEmpty(ac)) {
try {
return allAttn.count();
} catch(DatabaseException e) {
log.log(Level.WARNING, "Failed to get count of all attentions",
e);
}
return 0L;
}
//
// We need to iterate over all the attentions since the DB can't
// tell us the count. If calling this with a single constraint
// is common (for example, only a source key, or only a target key)
// we can probably special case to answer faster by instantiating
// a subIndex for the specific value and calling count() on it.
EntityJoin<Long,PersistentAttention> join = getAttentionJoin(ac);
long ret = 0;
try {
ForwardCursor<PersistentAttention> cur = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
cur = join.entities(txn, CursorConfig.READ_UNCOMMITTED);
for(PersistentAttention attn : cur) {
ret++;
}
} finally {
if(cur != null) {
cur.close();
}
if (txn != null) {
txn.commitNoSync();
}
}
} catch(DatabaseException e) {
log.log(Level.WARNING, "Failed to read attention ", e);
}
return ret;
}
public DBIterator<Attention> getAttentionSinceIterator(AttentionConfig ac,
Date timeStamp)
throws AuraException {
if (Util.isEmpty(ac)) {
throw new AuraException("At least one constraint must be " +
"specified before calling getAttentionSince(AttentionConfig)");
}
//
// We'll do the join in the DB, then filter the time on memory
EntityJoin<Long,PersistentAttention> join = getAttentionJoin(ac);
ForwardCursor cur = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
txn.setTxnTimeout(0);
cur = join.entities(txn, CursorConfig.READ_UNCOMMITTED);
} catch(DatabaseException e) {
handleCursorException(cur, txn, e);
}
DateFilterEntityIterator dbIt =
new DateFilterEntityIterator(cur, txn, timeStamp);
return dbIt;
}
/**
* Returns a count of attention meeting the given criteria, after the given
* date
*
* @param ac
* @return
*/
public Long getAttentionSinceCount(AttentionConfig ac, Date timeStamp)
throws AuraException {
if (Util.isEmpty(ac)) {
throw new AuraException("At least one constraint must be " +
"specified before calling getAttentionSince(AttentionConfig)");
}
//
// We need to iterate over all the attentions since the DB can't
// tell us the count. If calling this with a single constraint
// is common (for example, only a source key, or only a target key)
// we can probably special case to answer faster by instantiating
// a subIndex for the specific value and calling count() on it.
EntityJoin<Long,PersistentAttention> join = getAttentionJoin(ac);
long ret = 0;
try {
ForwardCursor<PersistentAttention> cur = null;
Transaction txn = null;
try {
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
cur = join.entities(txn, CursorConfig.READ_UNCOMMITTED);
for(PersistentAttention attn : cur) {
//
// Post process the date filter in memory
if (attn.getTimeStamp() >= timeStamp.getTime()) {
ret++;
}
}
} finally {
if(cur != null) {
cur.close();
}
if (txn != null) {
txn.commitNoSync();
}
}
} catch(DatabaseException e) {
log.log(Level.WARNING, "Failed to read attention ", e);
}
return ret;
}
/**
* Gets the most recent N attentions of a specified type) that
* a user has recorded. This will potentially perform a series of time
* based queries, expanding the query range until enough attentions have
* been found to satisfy the count, or until all attentions within the last
* year have been considered.
*
* @param srcKey the ID of the attention source to query for
* @param type the type of attention to limit to, or null for all attentions
* @param count the desired number of attentions to return
* @return a set of attentions, sorted by date
*/
public List<Attention> getLastAttentionForUser(String srcKey,
Attention.Type type, int count) {
//
// Start querying for attention for this user based on time, expanding
// the time range until we have enough attention.
Set<Attention> results = new HashSet<Attention>();
long recent = System.currentTimeMillis();
// Try one hour first
List<Attention> curr =
getUserAttnForTimePeriod(srcKey, type, recent,
Times.ONE_HOUR, count);
count -= curr.size();
recent -= Times.ONE_HOUR;
results.addAll(curr);
if(count <= 0) {
List<Attention> temp = new ArrayList<Attention>(results);
Collections.sort(temp, new ReverseAttentionTimeComparator());
return temp;
}
//
// Now add in from one hour ago to one day ago
curr = getUserAttnForTimePeriod(srcKey, type, recent,
Times.ONE_DAY, count);
count -= curr.size();
recent -= Times.ONE_DAY;
results.addAll(curr);
if(count <= 0) {
List<Attention> temp = new ArrayList<Attention>(results);
Collections.sort(temp, new ReverseAttentionTimeComparator());
return temp;
}
//
// Now add in from one day ago to one week ago
curr = getUserAttnForTimePeriod(srcKey, type, recent,
Times.ONE_WEEK, count);
count -= curr.size();
recent -= Times.ONE_WEEK;
results.addAll(curr);
if(count <= 0) {
List<Attention> temp = new ArrayList<Attention>(results);
Collections.sort(temp, new ReverseAttentionTimeComparator());
return temp;
}
//
// Now add in from one week ago to one month ago
curr = getUserAttnForTimePeriod(srcKey, type, recent,
Times.ONE_MONTH, count);
count -= curr.size();
recent -= Times.ONE_MONTH;
results.addAll(curr);
if(count <= 0) {
List<Attention> temp = new ArrayList<Attention>(results);
Collections.sort(temp, new ReverseAttentionTimeComparator());
return temp;
}
//
// Finally, expand out to one year.
curr = getUserAttnForTimePeriod(srcKey, type, recent,
Times.ONE_YEAR, count);
//
// Take whatever we got and return it. We won't search back more than
// one year.
results.addAll(curr);
List<Attention> temp = new ArrayList<Attention>(results);
Collections.sort(temp, new ReverseAttentionTimeComparator());
return temp;
}
private List<Attention> getUserAttnForTimePeriod(
String srcKey,
Attention.Type type,
long recentTime,
long interval,
int count) {
List<Attention> result = new ArrayList<Attention>();
//
// Set the begin and end times chronologically
StringAndTimeKey begin = new StringAndTimeKey(srcKey, recentTime -
interval);
StringAndTimeKey end = new StringAndTimeKey(srcKey + 1, recentTime);
EntityCursor<PersistentAttention> cursor = null;
Transaction txn = null;
try {
try {
//
// Examine each item in the cursor in reverse order (newest
// first) to see if it matches our requirements. If so, add
// it to our return set.
TransactionConfig conf = new TransactionConfig();
conf.setReadUncommitted(true);
txn = dbEnv.beginTransaction(null, conf);
cursor = attnBySourceAndTime.entities(txn, begin, true, end, false, CursorConfig.READ_UNCOMMITTED);
PersistentAttention curr = cursor.last();
while(curr != null && count > 0) {
if((type == null) || (curr.getType().equals(type))) {
result.add(curr);
count--;
}
curr = cursor.prev();
}
} finally {
if(cursor != null) {
cursor.close();
}
if (txn != null) {
txn.commitNoSync();
}
}
} catch(DatabaseException e) {
log.log(Level.WARNING, "Failed while retrieving " + count +
" recent attentions for user " + srcKey, e);
}
return result;
}
public boolean isEmpty() {
try {
return itemByKey.count() + allAttn.count() + allUsers.count() == 0;
} catch (DatabaseException e) {
log.log(Level.WARNING, "Failed to test for empty!", e);
return false;
}
}
/**
* Gets the number of items of a particular type that are in the index.
*
* @param type the type of item to count
* @return the number of instances of that item type in the index
*/
public long getItemCount(ItemType type) {
try {
if (type == null) {
return itemByKey.count();
} else {
EntityIndex idx = itemByType.subIndex(type.ordinal());
return idx.count();
}
} catch(DatabaseException e) {
log.log(Level.WARNING, "Failed to get count for items of type " +
type);
}
return 0;
}
/**
* Get the number of user entities in the entity store
*
* @return the number of users or -1 if there was an error
*/
public long getNumUsers() {
long count = -1;
try {
count = allUsers.count();
} catch(DatabaseException e) {
log.log(Level.WARNING, "getNumUsers failed", e);
}
return count;
}
/**
* Close up the entity store and the database environment.
*/
public void close() {
if(store != null) {
try {
System.out.println("BDB closing store");
store.close();
} catch(DatabaseException e) {
System.out.println("Failed to close entity store" + e);
e.printStackTrace();
}
}
if(dbEnv != null) {
try {
System.out.println("BDB closing dbEnv");
dbEnv.close();
} catch(DatabaseException e) {
System.out.println("Failed to close database environment" + e);
e.printStackTrace();
}
}
}
/**
* Gets the size of the database in bytes
*
* @return the size in bytes
*/
public long getSize() {
try {
EnvironmentStats stats = dbEnv.getStats(null);
return stats.getTotalLogSize();
} catch (DatabaseException e) {
log.warning("Failed to get DB stats: " + e.getMessage());
}
return 0;
}
protected void handleCursorException(ForwardCursor cur, Transaction txn, Exception cause)
throws AuraException {
try {
if(cur != null) {
cur.close();
}
} catch(DatabaseException ex) {
log.log(Level.WARNING, "Failed to close cursor", ex);
}
try {
if(txn != null) {
txn.abort();
}
} catch(DatabaseException ex) {
log.log(Level.WARNING, "Failed to abort cursor txn", ex);
}
throw new AuraException("Cursor failed", cause);
}
}






