001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.scheduler;
018
019 import org.apache.activemq.util.IOHelper;
020 import org.apache.activemq.util.ServiceStopper;
021 import org.apache.activemq.util.ServiceSupport;
022 import org.apache.kahadb.index.BTreeIndex;
023 import org.apache.kahadb.journal.Journal;
024 import org.apache.kahadb.journal.Location;
025 import org.apache.kahadb.page.Page;
026 import org.apache.kahadb.page.PageFile;
027 import org.apache.kahadb.page.Transaction;
028 import org.apache.kahadb.util.ByteSequence;
029 import org.apache.kahadb.util.IntegerMarshaller;
030 import org.apache.kahadb.util.LockFile;
031 import org.apache.kahadb.util.StringMarshaller;
032 import org.apache.kahadb.util.VariableMarshaller;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 import java.io.DataInput;
037 import java.io.DataOutput;
038 import java.io.File;
039 import java.io.IOException;
040 import java.util.ArrayList;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Map.Entry;
047 import java.util.Set;
048
049 public class JobSchedulerStore extends ServiceSupport {
050 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053 public static final int CLOSED_STATE = 1;
054 public static final int OPEN_STATE = 2;
055
056 private File directory;
057 PageFile pageFile;
058 private Journal journal;
059 private LockFile lockFile;
060 private boolean failIfDatabaseIsLocked;
061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063 private boolean enableIndexWriteAsync = false;
064 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065 MetaData metaData = new MetaData(this);
066 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068
069 protected class MetaData {
070 protected MetaData(JobSchedulerStore store) {
071 this.store = store;
072 }
073 private final JobSchedulerStore store;
074 Page<MetaData> page;
075 BTreeIndex<Integer, Integer> journalRC;
076 BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077
078 void createIndexes(Transaction tx) throws IOException {
079 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081 }
082
083 void load(Transaction tx) throws IOException {
084 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086 this.storedSchedulers.load(tx);
087 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089 this.journalRC.load(tx);
090 }
091
092 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094 Entry<String, JobSchedulerImpl> entry = i.next();
095 entry.getValue().load(tx);
096 schedulers.put(entry.getKey(), entry.getValue());
097 }
098 }
099
100 public void read(DataInput is) throws IOException {
101 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107 }
108
109 public void write(DataOutput os) throws IOException {
110 os.writeLong(this.storedSchedulers.getPageId());
111 os.writeLong(this.journalRC.getPageId());
112
113 }
114 }
115
116 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117 private final JobSchedulerStore store;037 import java.i">
038 import java.io.File;
039 import java.io.IOException;
040 import java.util.ArrayList;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Map.Entry;
047 import java.util.Set;
048
049 public class JobSchedulerStore extends ServiceSupport {
050 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053 public static final int CLOSED_STATE = 1;
054 public static final int OPEN_STATE = 2;
055
056 private File directory;
057 PageFile pageFile;
058 private Journal journal;
059 private LockFile lockFile;
060 private boolean failIfDatabaseIsLocked;
061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063 private boolean enableIndexWriteAsync = false;
064 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065 MetaData metaData = new MetaData(this);
066 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068
069 protected class MetaData {
070 protected MetaData(JobSchedulerStore store) {
071 this.store = store;
072 }
073 private final JobSchedulerStore store;
074 Page<MetaData> page;
075 BTreeIndex<Integer, Integer> journalRC;
076 BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077
078 void createIndexes(Transaction tx) throws IOException {
079 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081 }
082
083 void load(Transaction tx) throws IOException {
084 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086 this.storedSchedulers.load(tx);
087 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089 this.journalRC.load(tx);
090 }
091
092 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094 Entry<String, JobSchedulerImpl> entry = i.next();
095 entry.getValue().load(tx);
096 schedulers.put(entry.getKey(), entry.getValue());
097 }
098 }
099
100 public void read(DataInput is) throws IOException {
101 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107 }
108
109 public void write(DataOutput os) throws IOException {
110 os.writeLong(this.storedSchedulers.getPageId());
111 os.writeLong(this.journalRC.getPageId());
112
113 }
114 }
115
116 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117 private final JobSchedulerStore store;037 import java.i">
038 import java.io.File;
039 import java.io.IOException;
040 import java.util.ArrayList;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Map.Entry;
047 import java.util.Set;
048
049 public class JobSchedulerStore extends ServiceSupport {
050 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053 public static final int CLOSED_STATE = 1;
054 public static final int OPEN_STATE = 2;
055
056 private File directory;
057 PageFile pageFile;
058 private Journal journal;
059 private LockFile lockFile;
060 private boolean failIfDatabaseIsLocked;
061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063 private boolean enableIndexWriteAsync = false;
064 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065 MetaData metaData = new MetaData(this);
066 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068
069 protected class MetaData {
070 protected MetaData(JobSchedulerStore store) {
071 this.store = store;
072 }
073 private final JobSchedulerStore store;
074 Page<MetaData> page;
075 BTreeIndex<Integer, Integer> journalRC;
076 BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077
078 void createIndexes(Transaction tx) throws IOException {
079 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081 }
082
083 void load(Transaction tx) throws IOException {
084 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086 this.storedSchedulers.load(tx);
087 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089 this.journalRC.load(tx);
090 }
091
092 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094 Entry<String, JobSchedulerImpl> entry = i.next();
095 entry.getValue().load(tx);
096 schedulers.put(entry.getKey(), entry.getValue());
097 }
098 }
099
100 public void read(DataInput is) throws IOException {
101 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107 }
108
109 public void write(DataOutput os) throws IOException {
110 os.writeLong(this.storedSchedulers.getPageId());
111 os.writeLong(this.journalRC.getPageId());
112
113 }
114 }
115
116 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117 private final JobSchedulerStore store;037 import java.i">
038 import java.io.File;
039 import java.io.IOException;
040 import java.util.ArrayList;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;
046 import java.util.Map.Entry;
047 import java.util.Set;
048
049 public class JobSchedulerStore extends ServiceSupport {
050 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053 public static final int CLOSED_STATE = 1;
054 public static final int OPEN_STATE = 2;
055
056 private File directory;
057 PageFile pageFile;
058 private Journal journal;
059 private LockFile lockFile;
060 private boolean failIfDatabaseIsLocked;
061 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063 private boolean enableIndexWriteAsync = false;
064 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065 MetaData metaData = new MetaData(this);
066 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068
069 protected class MetaData {
070 protected MetaData(JobSchedulerStore store) {
071 this.store = store;
072 }
073 private final JobSchedulerStore store;
074 Page<MetaData> page;
075 BTreeIndex<Integer, Integer> journalRC;
076 BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077
078 void createIndexes(Transaction tx) throws IOException {
079 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081 }
082
083 void load(Transaction tx) throws IOException {
084 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086 this.storedSchedulers.load(tx);
087 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089 this.journalRC.load(tx);
090 }
091
092 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094 Entry<String, JobSchedulerImpl> entry = i.next();
095 entry.getValue().load(tx);
096 schedulers.put(entry.getKey(), entry.getValue());
097 }
098 }
099
100 public void read(DataInput is) throws IOException {
101 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107 }
108
109 public void write(DataOutput os) throws IOException {
110 os.writeLong(this.storedSchedulers.getPageId());
111 os.writeLong(this.journalRC.getPageId());
112
113 }
114 }
115
116 class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117 private final JobSchedulerStore store;037 import java.i">
038 import java.io.File;
039 import java.io.IOException;
040 import java.util.ArrayList;
041 import java.util.HashMap;
042 import java.util.HashSet;
043 import java.util.Iterator;
044 import java.util.List;
045 import java.util.Map;