CoherencePushReplicationDB.zip
In the example bellow I'm describing a way of developing a custom push replication publisher that publishes data to a database via JDBC. This example can be easily changed to publish data to other receivers (JMS,...) by performing changes to step 2 and small changes to step 3, steps that are presented bellow. I've used Eclipse as the development tool.
To develop a custom push replication publisher we will need to go through 6 steps: - Step 1: Create a custom publisher scheme class
- Step 2: Create a custom publisher class that should define what the publisher is doing.
- Step 3: Create a class data that is performing the actions (publish to JMS, DB, etc ) for the custom publisher.
- Step 4: Register the new publisher against a ContentHandler.
- Step 5: Add the new custom publisher to the cache configuration file.
- Step 6: Add the custom publisher scheme class to the POF configuration file.
The coherence project is attached and conclusions are presented at the end.
Step 1: In the Coherence Eclipse project create a class called CustomPublisherScheme that should implement com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme. In this class define the elements of the custom-publisher-scheme element.
For instance for a CustomPublisherScheme that looks like that:
1: <sync:publisher>
2: <sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>
3: <sync:publisher-scheme>
4: <sync:custom-publisher-scheme>
5: <sync:jdbc-string>jdbc:oracle:thin:@machine-name:1521:XE</sync:jdbc-string>
6: <sync:username>hr</sync:username>
7: <sync:password>hr</sync:password>
8: </sync:custom-publisher-scheme>
9: </sync:publisher-scheme>
10: </sync:publisher>
11:
the code is:
1: package com.oracle.coherence;
2:
3: import java.io.DataInput;
4: import java.io.DataOutput;
5: import java.io.IOException;
6: import com.oracle.coherence.patterns.pushreplication.Publisher;
7: import com.oracle.coherence.configuration.Configurable;
8: import com.oracle.coherence.configuration.Mandatory;
9: import com.oracle.coherence.configuration.Property;
10: import com.oracle.coherence.configuration.parameters.ParameterScope;
11: import com.oracle.coherence.environment.Environment;
12: import com.tangosol.io.pof.PofReader;
13: import com.tangosol.io.pof.PofWriter;
14: import com.tangosol.util.ExternalizableHelper;
15:
16: @Configurable
17: public class CustomPublisherScheme
18: extends
19: com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme
20: {
21: /**
22: *
23: */
24: private static final long serialVersionUID = 1L;
25: private String jdbcString;
26: private String username;
27: private String password;
28:
29: public String getJdbcString()
30: {
31: return this.jdbcString;
32: }
33:
34: @Property("jdbc-string")
35: @Mandatory
36: public void setJdbcString(String jdbcString)
37: {
38: this.jdbcString = jdbcString;
39: }
40:
41: public String getUsername()
42: {
43: return username;
44: }
45:
46: @Property("username")
47: @Mandatory
48: public void setUsername(String username)
49: {
50: this.username = username;
51: }
52:
53: public String getPassword()
54: {
55: return password;
56: }
57:
58: @Property("password")
59: @Mandatory
60: public void setPassword(String password)
61: {
62: this.password = password;
63: }
64:
65: public Publisher realize(Environment environment, ClassLoader classLoader,
66: ParameterScope parameterScope)
67: {
68: return new CustomPublisher(getJdbcString(), getUsername(),
69: getPassword());
70: }
71:
72: public void readExternal(DataInput in) throws IOException
73: {
74: super.readExternal(in);
75: this.jdbcString = ExternalizableHelper.readSafeUTF(in);
76: this.username = ExternalizableHelper.readSafeUTF(in);
77: this.password = ExternalizableHelper.readSafeUTF(in);
78: }
79:
80: public void writeExternal(DataOutput out) throws IOException
81: {
82: super.writeExternal(out);
83: ExternalizableHelper.writeSafeUTF(out, this.jdbcString);
84: ExternalizableHelper.writeSafeUTF(out, this.username);
85: ExternalizableHelper.writeSafeUTF(out, this.password);
86: }
87:
88: public void readExternal(PofReader reader) throws IOException
89: {
90: super.readExternal(reader);
91: this.jdbcString = reader.readString(100);
92: this.username = reader.readString(101);
93: this.password = reader.readString(102);
94: }
95:
96: public void writeExternal(PofWriter writer) throws IOException
97: {
98: super.writeExternal(writer);
99: writer.writeString(100, this.jdbcString);
100: writer.writeString(101, this.username);
101: writer.writeString(102, this.password);
102: }
103: }
Step 2: Define what the CustomPublisher should basically do by creating a new java class called CustomPublisher that implements com.oracle.coherence.patterns.pushreplication.Publisher
1: package com.oracle.coherence;
2:
3: import com.oracle.coherence.patterns.pushreplication.EntryOperation;
4: import com.oracle.coherence.patterns.pushreplication.Publisher;
5: import com.oracle.coherence.patterns.pushreplication.exceptions.PublisherNotReadyException;
6: import java.io.BufferedWriter;
7: import java.util.Iterator;
8:
9: public class CustomPublisher implements Publisher
10: {
11: private String jdbcString;
12: private String username;
13: private String password;
14: private transient BufferedWriter bufferedWriter;
15:
16: public CustomPublisher()
17: {
18: }
19:
20: public CustomPublisher(String jdbcString, String username, String password)
21: {
22: this.jdbcString = jdbcString;
23: this.username = username;
24: this.password = password;
25: this.bufferedWriter = null;
26: }
27:
28: public String getJdbcString()
29: {
30: return this.jdbcString;
31: }
32:
33: public String getUsername()
34: {
35: return username;
36: }
37:
38: public String getPassword()
39: {
40: return password;
41: }
42:
43: public void publishBatch(String cacheName, String publisherName,
44: Iterator<EntryOperation> entryOperations)
45: {
46: DatabasePersistence databasePersistence = new DatabasePersistence(
47: jdbcString, username, password);
48: while (entryOperations.hasNext())
49: {
50: EntryOperation entryOperation = (EntryOperation) entryOperations
51: .next();
52: databasePersistence.databasePersist(entryOperation);
53: }
54: }
55:
56: public void start(String cacheName, String publisherName)
57: throws PublisherNotReadyException
58: {
59: System.err
60: .printf("Started: Custom JDBC Publisher for Cache %s with Publisher %s\n",
61: new Object[] { cacheName, publisherName });
62: }
63:
64: public void stop(String cacheName, String publisherName)
65: {
66: System.err
67: .printf("Stopped: Custom JDBC Publisher for Cache %s with Publisher %s\n",
68: new Object[] { cacheName, publisherName });
69: }
70: }
In the publishBatch method from above we inform the publisher that he is supposed to persist data to a database:
1:
2: DatabasePersistence databasePersistence = new DatabasePersistence(
3: jdbcString, username, password);
4: while (entryOperations.hasNext())
5: {
6: EntryOperation entryOperation = (EntryOperation) entryOperations
7: .next();
8: databasePersistence.databasePersist(entryOperation);
9: }
Step 3: The class that deals with the persistence is a very basic one that uses JDBC to perform inserts/updates against a database.
1: package com.oracle.coherence;
2:
3: import com.oracle.coherence.patterns.pushreplication.EntryOperation;
4: import java.sql.*;
5: import java.text.SimpleDateFormat;
6: import com.oracle.coherence.Order;
7:
8: public class DatabasePersistence
9: {
10: public static String INSERT_OPERATION = "INSERT";
11: public static String UPDATE_OPERATION = "UPDATE";
12: public Connection dbConnection;
13:
14: public DatabasePersistence(String jdbcString, String username,
15: String password)
16: {
17: this.dbConnection = createConnection(jdbcString, username, password);
18: }
19:
20: public Connection createConnection(String jdbcString, String username,
21: String password)
22: {
23: Connection connection = null;
24: System.err.println("Connecting to: " + jdbcString + " Username: "
25: + username + " Password: " + password);
26: try
27: {
28: // Load the JDBC driver
29: String driverName = "oracle.jdbc.driver.OracleDriver";
30: Class.forName(driverName);
31: // Create a connection to the database
32: connection = DriverManager.getConnection(jdbcString, username,
33: password);
34: System.err.println("Connected to:" + jdbcString + " Username: "
35: + username + " Password: " + password);
36: }
37: catch (ClassNotFoundException e)
38: {
39: e.printStackTrace();
40: }
41: // driver
42: catch (SQLException e)
43: {
44: e.printStackTrace();
45: }
46: return connection;
47: }
48:
49: public void databasePersist(EntryOperation entryOperation)
50: {
51: if (entryOperation.getOperation().toString()
52: .equalsIgnoreCase(INSERT_OPERATION))
53: {
54: insert(((Order) entryOperation.getPublishableEntry().getValue()));
55: }
56: else
57: if (entryOperation.getOperation().toString()
58: .equalsIgnoreCase(UPDATE_OPERATION))
59: {
60: update(((Order) entryOperation.getPublishableEntry().getValue()));
61: }
62: }
63:
64: public void update(Order order)
65: {
66: String update = "UPDATE Orders set QUANTITY= '"
67: + order.getQuantity()
68: + "', AMOUNT='"
69: + order.getAmount()
70: + "', ORD_DATE= '"
71: + (new SimpleDateFormat("dd-MMM-yyyy")).format(order
72: .getOrdDate()) + "' WHERE SYMBOL='" + order.getSymbol()
73: + "'";
74: System.err.println("UPDATE = " + update);
75: try
76: {
77: Statement stmt = getDbConnection().createStatement();
78: stmt.execute(update);
79: stmt.close();
80: }
81: catch (SQLException ex)
82: {
83: System.err.println("SQLException: " + ex.getMessage());
84: }
85: }
86:
87: public void insert(Order order)
88: {
89: String insert = "insert into Orders values('"
90: + order.getSymbol()
91: + "',"
92: + order.getQuantity()
93: + ","
94: + order.getAmount()
95: + ",'"
96: + (new SimpleDateFormat("dd-MMM-yyyy")).format(order
97: .getOrdDate()) + "')";
98: System.err.println("INSERT = " + insert);
99: try
100: {
101: Statement stmt = getDbConnection().createStatement();
102: stmt.execute(insert);
103: stmt.close();
104: }
105: catch (SQLException ex)
106: {
107: System.err.println("SQLException: " + ex.getMessage());
108: }
109: }
110:
111: public Connection getDbConnection()
112: {
113: return dbConnection;
114: }
115:
116: public void setDbConnection(Connection dbConnection)
117: {
118: this.dbConnection = dbConnection;
119: }
120: }
Step 4: Now we need to register our publisher against a ContentHandler. In order to achieve that we need to create in our eclipse project a new class called CustomPushReplicationNamespaceContentHandler that should extend the com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler. In the constructor of the new class we define a new handler for our custom publisher.
1: package com.oracle.coherence;
2:
3: import com.oracle.coherence.configuration.Configurator;
4: import com.oracle.coherence.environment.extensible.ConfigurationContext;
5: import com.oracle.coherence.environment.extensible.ConfigurationException;
6: import com.oracle.coherence.environment.extensible.ElementContentHandler;
7: import com.oracle.coherence.patterns.pushreplication.PublisherScheme;
8: import com.oracle.coherence.environment.extensible.QualifiedName;
9: import com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler;
10: import com.tangosol.run.xml.XmlElement;
11:
12: public class CustomPushReplicationNamespaceContentHandler extends
13: PushReplicationNamespaceContentHandler
14: {
15: public CustomPushReplicationNamespaceContentHandler()
16: {
17: super();
18: registerContentHandler("custom-publisher-scheme",
19: new ElementContentHandler()
20: {
21: public Object onElement(ConfigurationContext context,
22: QualifiedName qualifiedName, XmlElement xmlElement)
23: throws ConfigurationException
24: {
25: PublisherScheme publisherScheme = new CustomPublisherScheme();
26: Configurator.configure(publisherScheme, context,
27: qualifiedName, xmlElement);
28: return publisherScheme;
29: }
30: });
31: }
32: }
Step 5: Now we should define our CustomPublisher in the cache configuration file according to the following documentation.
1: <cache-config
2: xmlns:sync="class:com.oracle.coherence.CustomPushReplicationNamespaceContentHandler"
3: xmlns:cr="class:com.oracle.coherence.environment.extensible.namespaces.InstanceNamespaceContentHandler">
4: <caching-schemes>
5: <sync:provider pof-enabled="false">
6: <sync:coherence-provider />
7: </sync:provider>
8: <caching-scheme-mapping>
9: <cache-mapping>
10: <cache-name>publishing-cache</cache-name>
11: <scheme-name>distributed-scheme-with-publishing-cachestore</scheme-name>
12: <autostart>true</autostart>
13: <sync:publisher>
14: <sync:publisher-name>Active2 Publisher</sync:publisher-name>
15: <sync:publisher-scheme>
16: <sync:remote-cluster-publisher-scheme>
17: <sync:remote-invocation-service-name>remote-site1
18: </sync:remote-invocation-service-name>
19: <sync:remote-publisher-scheme>
20: <sync:local-cache-publisher-scheme>
21: <sync:target-cache-name>publishing-cache
22: </sync:target-cache-name>
23: </sync:local-cache-publisher-scheme>
24: </sync:remote-publisher-scheme>
25: <sync:autostart>true</sync:autostart>
26: </sync:remote-cluster-publisher-scheme>
27: </sync:publisher-scheme>
28: </sync:publisher>
29: <sync:publisher>
30: <sync:publisher-name>Active2-Output-Publisher</sync:publisher-name>
31: <sync:publisher-scheme>
32: <sync:stderr-publisher-scheme>
33: <sync:autostart>true</sync:autostart>
34: <sync:publish-original-value>true</sync:publish-original-value>
35: </sync:stderr-publisher-scheme>
36: </sync:publisher-scheme>
37: </sync:publisher>
38: <sync:publisher>
39: <sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>
40: <sync:publisher-scheme>
41: <sync:custom-publisher-scheme>
42: <sync:jdbc-string>jdbc:oracle:thin:@machine_name:1521:XE
43: </sync:jdbc-string>
44: <sync:username>hr</sync:username>
45: <sync:password>hr</sync:password>
46: </sync:custom-publisher-scheme>
47: </sync:publisher-scheme>
48: </sync:publisher>
49: </cache-mapping>
50: </caching-scheme-mapping>
51: <!-- The following scheme is required for each remote-site when using a
52: RemoteInvocationPublisher -->
53: <remote-invocation-scheme>
54: <service-name>remote-site1</service-name>
55: <initiator-config>
56: <tcp-initiator>
57: <remote-addresses>
58: <socket-address>
59: <address>localhost</address>
60: <port>20001</port>
61: </socket-address>
62: </remote-addresses>
63: <connect-timeout>2s</connect-timeout>
64: </tcp-initiator>
65: <outgoing-message-handler>
66: <request-timeout>5s</request-timeout>
67: </outgoing-message-handler>
68: </initiator-config>
69: </remote-invocation-scheme>
70: <!-- END: com.oracle.coherence.patterns.pushreplication -->
71: <proxy-scheme>
72: <service-name>ExtendTcpProxyService</service-name>
73: <acceptor-config>
74: <tcp-acceptor>
75: <local-address>
76: <address>localhost</address>
77: <port>20002</port>
78: </local-address>
79: </tcp-acceptor>
80: </acceptor-config>
81: <autostart>true</autostart>
82: </proxy-scheme>
83: </caching-schemes>
84: </cache-config>
As you can see in the red-marked text from above I've:
- set new Namespace Content Handler
- define the new custom publisher that should work together with other publishers like: stderr and remote publishers in our case.
- set new Namespace Content Handler
- define the new custom publisher that should work together with other publishers like: stderr and remote publishers in our case.
Step 6: Add the com.oracle.coherence.CustomPublisherScheme to your custom-pof-config file:
1: <pof-config>
2: <user-type-list>
3: <!-- Built in types -->
4: <include>coherence-pof-config.xml</include>
5: <include>coherence-common-pof-config.xml</include>
6: <include>coherence-messagingpattern-pof-config.xml</include>
7: <include>coherence-pushreplicationpattern-pof-config.xml</include>
8: <!-- Application types -->
9: <user-type>
10: <type-id>1901</type-id>
11: <class-name>com.oracle.coherence.Order</class-name>
12: <serializer>
13: <class-name>com.oracle.coherence.OrderSerializer</class-name>
14: </serializer>
15: </user-type>
16: <user-type>
17: <type-id>1902</type-id>
18: <class-name>com.oracle.coherence.CustomPublisherScheme</class-name>
19: </user-type>
20: </user-type-list>
21: </pof-config>
CONCLUSIONSThis approach allows for publishers to publish data to almost any other receiver (database, JMS, MQ, ...). The only thing that needs to be changed is the DatabasePersistence.java class that should be adapted to the chosen receiver. Only minor changes are needed for the rest of the code (to publishBatch method from CustomPublisher class).
No comments:
Post a Comment