001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.scheduler;
018    
019    import org.apache.activemq.util.IOHelper;
020    import org.apache.activemq.util.ServiceStopper;
021    import org.apache.activemq.util.ServiceSupport;
022    import org.apache.kahadb.index.BTreeIndex;
023    import org.apache.kahadb.journal.Journal;
024    import org.apache.kahadb.journal.Location;
025    import org.apache.kahadb.page.Page;
026    import org.apache.kahadb.page.PageFile;
027    import org.apache.kahadb.page.Transaction;
028    import org.apache.kahadb.util.ByteSequence;
029    import org.apache.kahadb.util.IntegerMarshaller;
030    import org.apache.kahadb.util.LockFile;
031    import org.apache.kahadb.util.StringMarshaller;
032    import org.apache.kahadb.util.VariableMarshaller;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    import java.io.DataInput;
037    import java.io.DataOutput;
038    import java.io.File;
039    import java.io.IOException;
040    import java.util.ArrayList;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    public class JobSchedulerStore extends ServiceSupport {
050        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052    
053        public static final int CLOSED_STATE = 1;
054        public static final int OPEN_STATE = 2;
055    
056        private File directory;
057        PageFile pageFile;
058        private Journal journal;
059        private LockFile lockFile;
060        private boolean failIfDatabaseIsLocked;
061        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063        private boolean enableIndexWriteAsync = false;
064        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065        MetaData metaData = new MetaData(this);
066        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068    
069        protected class MetaData {
070            protected MetaData(JobSchedulerStore store) {
071                this.store = store;
072            }
073            private final JobSchedulerStore store;
074            Page<MetaData> page;
075            BTreeIndex<Integer, Integer> journalRC;
076            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077    
078            void createIndexes(Transaction tx) throws IOException {
079                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081            }
082    
083            void load(Transaction tx) throws IOException {
084                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086                this.storedSchedulers.load(tx);
087                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089                this.journalRC.load(tx);
090            }
091    
092            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094                    Entry<String, JobSchedulerImpl> entry = i.next();
095                    entry.getValue().load(tx);
096                    schedulers.put(entry.getKey(), entry.getValue());
097                }
098            }
099    
100            public void read(DataInput is) throws IOException {
101                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107            }
108    
109            public void write(DataOutput os) throws IOException {
110                os.writeLong(this.storedSchedulers.getPageId());
111                os.writeLong(this.journalRC.getPageId());
112    
113            }
114        }
115    
116        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117            private final JobSchedulerStore store;037    import java.i">
038    import java.io.File;
039    import java.io.IOException;
040    import java.util.ArrayList;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    public class JobSchedulerStore extends ServiceSupport {
050        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052    
053        public static final int CLOSED_STATE = 1;
054        public static final int OPEN_STATE = 2;
055    
056        private File directory;
057        PageFile pageFile;
058        private Journal journal;
059        private LockFile lockFile;
060        private boolean failIfDatabaseIsLocked;
061        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063        private boolean enableIndexWriteAsync = false;
064        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065        MetaData metaData = new MetaData(this);
066        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068    
069        protected class MetaData {
070            protected MetaData(JobSchedulerStore store) {
071                this.store = store;
072            }
073            private final JobSchedulerStore store;
074            Page<MetaData> page;
075            BTreeIndex<Integer, Integer> journalRC;
076            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077    
078            void createIndexes(Transaction tx) throws IOException {
079                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081            }
082    
083            void load(Transaction tx) throws IOException {
084                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086                this.storedSchedulers.load(tx);
087                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089                this.journalRC.load(tx);
090            }
091    
092            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094                    Entry<String, JobSchedulerImpl> entry = i.next();
095                    entry.getValue().load(tx);
096                    schedulers.put(entry.getKey(), entry.getValue());
097                }
098            }
099    
100            public void read(DataInput is) throws IOException {
101                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107            }
108    
109            public void write(DataOutput os) throws IOException {
110                os.writeLong(this.storedSchedulers.getPageId());
111                os.writeLong(this.journalRC.getPageId());
112    
113            }
114        }
115    
116        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117            private final JobSchedulerStore store;037    import java.i">
038    import java.io.File;
039    import java.io.IOException;
040    import java.util.ArrayList;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    public class JobSchedulerStore extends ServiceSupport {
050        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052    
053        public static final int CLOSED_STATE = 1;
054        public static final int OPEN_STATE = 2;
055    
056        private File directory;
057        PageFile pageFile;
058        private Journal journal;
059        private LockFile lockFile;
060        private boolean failIfDatabaseIsLocked;
061        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063        private boolean enableIndexWriteAsync = false;
064        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065        MetaData metaData = new MetaData(this);
066        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068    
069        protected class MetaData {
070            protected MetaData(JobSchedulerStore store) {
071                this.store = store;
072            }
073            private final JobSchedulerStore store;
074            Page<MetaData> page;
075            BTreeIndex<Integer, Integer> journalRC;
076            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077    
078            void createIndexes(Transaction tx) throws IOException {
079                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081            }
082    
083            void load(Transaction tx) throws IOException {
084                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086                this.storedSchedulers.load(tx);
087                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089                this.journalRC.load(tx);
090            }
091    
092            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094                    Entry<String, JobSchedulerImpl> entry = i.next();
095                    entry.getValue().load(tx);
096                    schedulers.put(entry.getKey(), entry.getValue());
097                }
098            }
099    
100            public void read(DataInput is) throws IOException {
101                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107            }
108    
109            public void write(DataOutput os) throws IOException {
110                os.writeLong(this.storedSchedulers.getPageId());
111                os.writeLong(this.journalRC.getPageId());
112    
113            }
114        }
115    
116        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117            private final JobSchedulerStore store;037    import java.i">
038    import java.io.File;
039    import java.io.IOException;
040    import java.util.ArrayList;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    public class JobSchedulerStore extends ServiceSupport {
050        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052    
053        public static final int CLOSED_STATE = 1;
054        public static final int OPEN_STATE = 2;
055    
056        private File directory;
057        PageFile pageFile;
058        private Journal journal;
059        private LockFile lockFile;
060        private boolean failIfDatabaseIsLocked;
061        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063        private boolean enableIndexWriteAsync = false;
064        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065        MetaData metaData = new MetaData(this);
066        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068    
069        protected class MetaData {
070            protected MetaData(JobSchedulerStore store) {
071                this.store = store;
072            }
073            private final JobSchedulerStore store;
074            Page<MetaData> page;
075            BTreeIndex<Integer, Integer> journalRC;
076            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077    
078            void createIndexes(Transaction tx) throws IOException {
079                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081            }
082    
083            void load(Transaction tx) throws IOException {
084                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086                this.storedSchedulers.load(tx);
087                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089                this.journalRC.load(tx);
090            }
091    
092            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094                    Entry<String, JobSchedulerImpl> entry = i.next();
095                    entry.getValue().load(tx);
096                    schedulers.put(entry.getKey(), entry.getValue());
097                }
098            }
099    
100            public void read(DataInput is) throws IOException {
101                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107            }
108    
109            public void write(DataOutput os) throws IOException {
110                os.writeLong(this.storedSchedulers.getPageId());
111                os.writeLong(this.journalRC.getPageId());
112    
113            }
114        }
115    
116        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117            private final JobSchedulerStore store;037    import java.i">
038    import java.io.File;
039    import java.io.IOException;
040    import java.util.ArrayList;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Map;