Thursday, February 21, 2013

One solution for: “weblogic.jms.common.JMSException: No failover destination.”


Today was playing with Uniform distributed Queue in a WLS 12.1.1.0 cluster and after several tests, without any setting changed in the servers, I’ve got this exception:
weblogic.jms.common.JMSException: No failover destination.
In the logs there was another exception that appeared in each of the managed servers after a restart, exactly before the server changed its status to RUNNING.
The exception was:
java.io.NotSerializableException: com.sun.jersey.server.impl.cdi.CDIExtension
According to BUG 13975839, this is an issue in WLS 12.1.1.0 that have been fixed in WLS 12.1.2.0. The BUG description says that: “CDI beans are not replicated as they are not serializable”.
The workaround in WLS 12.1.1.0 is to add this in your managed server start parameters:
-Dcom.sun.jersey.server.impl.cdi.lookupExtensionInBeanManager=true
Adding this start option fixed also my initial issue with jms. Now, regarding the relation between the initial issue and the CDI serialization bug, for the moment I can just guess but after further investigations I’ll update you all – stay tuned…

Thursday, February 7, 2013

Coherence - How to develop a custom push replication publisher


CoherencePushReplicationDB.zip
In the example bellow I'm describing a way of developing a custom push replication publisher that publishes data to a database via JDBC. This example can be easily changed to publish data to other receivers (JMS,...) by performing changes to step 2 and small changes to step 3, steps that are presented bellow. I've used Eclipse as the development tool.
To develop a custom push replication publisher we will need to go through 6 steps:
  • Step 1: Create a custom publisher scheme class
  • Step 2: Create a custom publisher class that should define what the publisher is doing.
  • Step 3: Create a class data that is performing the actions (publish to JMS, DB, etc ) for the custom publisher.
  • Step 4: Register the new publisher against a ContentHandler.
  • Step 5: Add the new custom publisher to the cache configuration file.
  • Step 6: Add the custom publisher scheme class to the POF configuration file.
All these steps are detailed bellow.
The coherence project is attached and conclusions are presented at the end.
Step 1: In the Coherence Eclipse project create a class called CustomPublisherScheme that should implement com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme. In this class define the elements of the custom-publisher-scheme element.
For instance for a CustomPublisherScheme that looks like that:
   1: <sync:publisher>

   2:     <sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>

   3:     <sync:publisher-scheme>

   4:         <sync:custom-publisher-scheme>

   5:             <sync:jdbc-string>jdbc:oracle:thin:@machine-name:1521:XE</sync:jdbc-string>

   6:             <sync:username>hr</sync:username>

   7:             <sync:password>hr</sync:password>

   8:         </sync:custom-publisher-scheme>

   9:     </sync:publisher-scheme>

  10: </sync:publisher>

  11:  

the code is:


   1: package com.oracle.coherence;

   2:  

   3: import java.io.DataInput;

   4: import java.io.DataOutput;

   5: import java.io.IOException;

   6: import com.oracle.coherence.patterns.pushreplication.Publisher;

   7: import com.oracle.coherence.configuration.Configurable;

   8: import com.oracle.coherence.configuration.Mandatory;

   9: import com.oracle.coherence.configuration.Property;

  10: import com.oracle.coherence.configuration.parameters.ParameterScope;

  11: import com.oracle.coherence.environment.Environment;

  12: import com.tangosol.io.pof.PofReader;

  13: import com.tangosol.io.pof.PofWriter;

  14: import com.tangosol.util.ExternalizableHelper;

  15:  

  16: @Configurable

  17: public class CustomPublisherScheme

  18:         extends

  19:         com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme

  20: {

  21:     /**

  22: * 

  23: */

  24:     private static final long serialVersionUID = 1L;

  25:     private String jdbcString;

  26:     private String username;

  27:     private String password;

  28:  

  29:     public String getJdbcString()

  30:     {

  31:         return this.jdbcString;

  32:     }

  33:  

  34:     @Property("jdbc-string")

  35:     @Mandatory

  36:     public void setJdbcString(String jdbcString)

  37:     {

  38:         this.jdbcString = jdbcString;

  39:     }

  40:  

  41:     public String getUsername()

  42:     {

  43:         return username;

  44:     }

  45:  

  46:     @Property("username")

  47:     @Mandatory

  48:     public void setUsername(String username)

  49:     {

  50:         this.username = username;

  51:     }

  52:  

  53:     public String getPassword()

  54:     {

  55:         return password;

  56:     }

  57:  

  58:     @Property("password")

  59:     @Mandatory

  60:     public void setPassword(String password)

  61:     {

  62:         this.password = password;

  63:     }

  64:  

  65:     public Publisher realize(Environment environment, ClassLoader classLoader,

  66:             ParameterScope parameterScope)

  67:     {

  68:         return new CustomPublisher(getJdbcString(), getUsername(),

  69:                 getPassword());

  70:     }

  71:  

  72:     public void readExternal(DataInput in) throws IOException

  73:     {

  74:         super.readExternal(in);

  75:         this.jdbcString = ExternalizableHelper.readSafeUTF(in);

  76:         this.username = ExternalizableHelper.readSafeUTF(in);

  77:         this.password = ExternalizableHelper.readSafeUTF(in);

  78:     }

  79:  

  80:     public void writeExternal(DataOutput out) throws IOException

  81:     {

  82:         super.writeExternal(out);

  83:         ExternalizableHelper.writeSafeUTF(out, this.jdbcString);

  84:         ExternalizableHelper.writeSafeUTF(out, this.username);

  85:         ExternalizableHelper.writeSafeUTF(out, this.password);

  86:     }

  87:  

  88:     public void readExternal(PofReader reader) throws IOException

  89:     {

  90:         super.readExternal(reader);

  91:         this.jdbcString = reader.readString(100);

  92:         this.username = reader.readString(101);

  93:         this.password = reader.readString(102);

  94:     }

  95:  

  96:     public void writeExternal(PofWriter writer) throws IOException

  97:     {

  98:         super.writeExternal(writer);

  99:         writer.writeString(100, this.jdbcString);

 100:         writer.writeString(101, this.username);

 101:         writer.writeString(102, this.password);

 102:     }

 103: }


Step 2: Define what the CustomPublisher should basically do by creating a new java class called CustomPublisher that implements com.oracle.coherence.patterns.pushreplication.Publisher


   1: package com.oracle.coherence;

   2:  

   3: import com.oracle.coherence.patterns.pushreplication.EntryOperation;

   4: import com.oracle.coherence.patterns.pushreplication.Publisher;

   5: import com.oracle.coherence.patterns.pushreplication.exceptions.PublisherNotReadyException;

   6: import java.io.BufferedWriter;

   7: import java.util.Iterator;

   8:  

   9: public class CustomPublisher implements Publisher

  10: {

  11:     private String jdbcString;

  12:     private String username;

  13:     private String password;

  14:     private transient BufferedWriter bufferedWriter;

  15:  

  16:     public CustomPublisher()

  17:     {

  18:     }

  19:  

  20:     public CustomPublisher(String jdbcString, String username, String password)

  21:     {

  22:         this.jdbcString = jdbcString;

  23:         this.username = username;

  24:         this.password = password;

  25:         this.bufferedWriter = null;

  26:     }

  27:  

  28:     public String getJdbcString()

  29:     {

  30:         return this.jdbcString;

  31:     }

  32:  

  33:     public String getUsername()

  34:     {

  35:         return username;

  36:     }

  37:  

  38:     public String getPassword()

  39:     {

  40:         return password;

  41:     }

  42:  

  43:     public void publishBatch(String cacheName, String publisherName,

  44:             Iterator<EntryOperation> entryOperations)

  45:     {

  46:         DatabasePersistence databasePersistence = new DatabasePersistence(

  47:                 jdbcString, username, password);

  48:         while (entryOperations.hasNext())

  49:         {

  50:             EntryOperation entryOperation = (EntryOperation) entryOperations

  51:                     .next();

  52:             databasePersistence.databasePersist(entryOperation);

  53:         }

  54:     }

  55:  

  56:     public void start(String cacheName, String publisherName)

  57:             throws PublisherNotReadyException

  58:     {

  59:         System.err

  60:                 .printf("Started: Custom JDBC Publisher for Cache %s with Publisher %s\n",

  61:                         new Object[] { cacheName, publisherName });

  62:     }

  63:  

  64:     public void stop(String cacheName, String publisherName)

  65:     {

  66:         System.err

  67:                 .printf("Stopped: Custom JDBC Publisher for Cache %s with Publisher %s\n",

  68:                         new Object[] { cacheName, publisherName });

  69:     }

  70: }

In the publishBatch method from above we inform the publisher that he is supposed to persist data to a database:


   1:  

   2: DatabasePersistence databasePersistence = new DatabasePersistence(

   3:                 jdbcString, username, password);

   4:         while (entryOperations.hasNext())

   5:         {

   6:             EntryOperation entryOperation = (EntryOperation) entryOperations

   7:                     .next();

   8:             databasePersistence.databasePersist(entryOperation);

   9:         }

Step 3: The class that deals with the persistence is a very basic one that uses JDBC to perform inserts/updates against a database.


   1: package com.oracle.coherence;

   2:  

   3: import com.oracle.coherence.patterns.pushreplication.EntryOperation;

   4: import java.sql.*;

   5: import java.text.SimpleDateFormat;

   6: import com.oracle.coherence.Order;

   7:  

   8: public class DatabasePersistence

   9: {

  10:     public static String INSERT_OPERATION = "INSERT";

  11:     public static String UPDATE_OPERATION = "UPDATE";

  12:     public Connection dbConnection;

  13:  

  14:     public DatabasePersistence(String jdbcString, String username,

  15:             String password)

  16:     {

  17:         this.dbConnection = createConnection(jdbcString, username, password);

  18:     }

  19:  

  20:     public Connection createConnection(String jdbcString, String username,

  21:             String password)

  22:     {

  23:         Connection connection = null;

  24:         System.err.println("Connecting to: " + jdbcString + " Username: "

  25:                 + username + " Password: " + password);

  26:         try

  27:         {

  28:             // Load the JDBC driver

  29:             String driverName = "oracle.jdbc.driver.OracleDriver";

  30:             Class.forName(driverName);

  31:             // Create a connection to the database

  32:             connection = DriverManager.getConnection(jdbcString, username,

  33:                     password);

  34:             System.err.println("Connected to:" + jdbcString + " Username: "

  35:                     + username + " Password: " + password);

  36:         }

  37:         catch (ClassNotFoundException e)

  38:         {

  39:             e.printStackTrace();

  40:         }

  41:         // driver

  42:         catch (SQLException e)

  43:         {

  44:             e.printStackTrace();

  45:         }

  46:         return connection;

  47:     }

  48:  

  49:     public void databasePersist(EntryOperation entryOperation)

  50:     {

  51:         if (entryOperation.getOperation().toString()

  52:                 .equalsIgnoreCase(INSERT_OPERATION))

  53:         {

  54:             insert(((Order) entryOperation.getPublishableEntry().getValue()));

  55:         }

  56:         else

  57:             if (entryOperation.getOperation().toString()

  58:                     .equalsIgnoreCase(UPDATE_OPERATION))

  59:             {

  60:                 update(((Order) entryOperation.getPublishableEntry().getValue()));

  61:             }

  62:     }

  63:  

  64:     public void update(Order order)

  65:     {

  66:         String update = "UPDATE Orders set QUANTITY= '"

  67:                 + order.getQuantity()

  68:                 + "', AMOUNT='"

  69:                 + order.getAmount()

  70:                 + "', ORD_DATE= '"

  71:                 + (new SimpleDateFormat("dd-MMM-yyyy")).format(order

  72:                         .getOrdDate()) + "' WHERE SYMBOL='" + order.getSymbol()

  73:                 + "'";

  74:         System.err.println("UPDATE = " + update);

  75:         try

  76:         {

  77:             Statement stmt = getDbConnection().createStatement();

  78:             stmt.execute(update);

  79:             stmt.close();

  80:         }

  81:         catch (SQLException ex)

  82:         {

  83:             System.err.println("SQLException: " + ex.getMessage());

  84:         }

  85:     }

  86:  

  87:     public void insert(Order order)

  88:     {

  89:         String insert = "insert into Orders values('"

  90:                 + order.getSymbol()

  91:                 + "',"

  92:                 + order.getQuantity()

  93:                 + ","

  94:                 + order.getAmount()

  95:                 + ",'"

  96:                 + (new SimpleDateFormat("dd-MMM-yyyy")).format(order

  97:                         .getOrdDate()) + "')";

  98:         System.err.println("INSERT = " + insert);

  99:         try

 100:         {

 101:             Statement stmt = getDbConnection().createStatement();

 102:             stmt.execute(insert);

 103:             stmt.close();

 104:         }

 105:         catch (SQLException ex)

 106:         {

 107:             System.err.println("SQLException: " + ex.getMessage());

 108:         }

 109:     }

 110:  

 111:     public Connection getDbConnection()

 112:     {

 113:         return dbConnection;

 114:     }

 115:  

 116:     public void setDbConnection(Connection dbConnection)

 117:     {

 118:         this.dbConnection = dbConnection;

 119:     }

 120: }

Step 4: Now we need to register our publisher against a ContentHandler. In order to achieve that we need to create in our eclipse project a new class called CustomPushReplicationNamespaceContentHandler that should extend the com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler. In the constructor of the new class we define a new handler for our custom publisher.


   1: package com.oracle.coherence;

   2:  

   3: import com.oracle.coherence.configuration.Configurator;

   4: import com.oracle.coherence.environment.extensible.ConfigurationContext;

   5: import com.oracle.coherence.environment.extensible.ConfigurationException;

   6: import com.oracle.coherence.environment.extensible.ElementContentHandler;

   7: import com.oracle.coherence.patterns.pushreplication.PublisherScheme;

   8: import com.oracle.coherence.environment.extensible.QualifiedName;

   9: import com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler;

  10: import com.tangosol.run.xml.XmlElement;

  11:  

  12: public class CustomPushReplicationNamespaceContentHandler extends

  13:         PushReplicationNamespaceContentHandler

  14: {

  15:     public CustomPushReplicationNamespaceContentHandler()

  16:     {

  17:         super();

  18:         registerContentHandler("custom-publisher-scheme",

  19:                 new ElementContentHandler()

  20:                 {

  21:                     public Object onElement(ConfigurationContext context,

  22:                             QualifiedName qualifiedName, XmlElement xmlElement)

  23:                             throws ConfigurationException

  24:                     {

  25:                         PublisherScheme publisherScheme = new CustomPublisherScheme();

  26:                         Configurator.configure(publisherScheme, context,

  27:                                 qualifiedName, xmlElement);

  28:                         return publisherScheme;

  29:                     }

  30:                 });

  31:     }

  32: }


Step 5: Now we should define our CustomPublisher in the cache configuration file according to the following documentation.


   1: <cache-config

   2:     xmlns:sync="class:com.oracle.coherence.CustomPushReplicationNamespaceContentHandler"

   3:     xmlns:cr="class:com.oracle.coherence.environment.extensible.namespaces.InstanceNamespaceContentHandler">

   4:     <caching-schemes>

   5:         <sync:provider pof-enabled="false">

   6:             <sync:coherence-provider />

   7:         </sync:provider>

   8:         <caching-scheme-mapping>

   9:             <cache-mapping>

  10:                 <cache-name>publishing-cache</cache-name>

  11:                 <scheme-name>distributed-scheme-with-publishing-cachestore</scheme-name>

  12:                 <autostart>true</autostart>

  13:                 <sync:publisher>

  14:                     <sync:publisher-name>Active2 Publisher</sync:publisher-name>

  15:                     <sync:publisher-scheme>

  16:                         <sync:remote-cluster-publisher-scheme>

  17:                             <sync:remote-invocation-service-name>remote-site1

  18:                             </sync:remote-invocation-service-name>

  19:                             <sync:remote-publisher-scheme>

  20:                                 <sync:local-cache-publisher-scheme>

  21:                                     <sync:target-cache-name>publishing-cache

  22:                                     </sync:target-cache-name>

  23:                                 </sync:local-cache-publisher-scheme>

  24:                             </sync:remote-publisher-scheme>

  25:                             <sync:autostart>true</sync:autostart>

  26:                         </sync:remote-cluster-publisher-scheme>

  27:                     </sync:publisher-scheme>

  28:                 </sync:publisher>

  29:                 <sync:publisher>

  30:                     <sync:publisher-name>Active2-Output-Publisher</sync:publisher-name>

  31:                     <sync:publisher-scheme>

  32:                         <sync:stderr-publisher-scheme>

  33:                             <sync:autostart>true</sync:autostart>

  34:                             <sync:publish-original-value>true</sync:publish-original-value>

  35:                         </sync:stderr-publisher-scheme>

  36:                     </sync:publisher-scheme>

  37:                 </sync:publisher>

  38:                 <sync:publisher>

  39:                     <sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>

  40:                     <sync:publisher-scheme>

  41:                         <sync:custom-publisher-scheme>

  42:                             <sync:jdbc-string>jdbc:oracle:thin:@machine_name:1521:XE

  43:                             </sync:jdbc-string>

  44:                             <sync:username>hr</sync:username>

  45:                             <sync:password>hr</sync:password>

  46:                         </sync:custom-publisher-scheme>

  47:                     </sync:publisher-scheme>

  48:                 </sync:publisher>

  49:             </cache-mapping>

  50:         </caching-scheme-mapping>

  51:         <!-- The following scheme is required for each remote-site when using a 

  52:             RemoteInvocationPublisher -->

  53:         <remote-invocation-scheme>

  54:             <service-name>remote-site1</service-name>

  55:             <initiator-config>

  56:                 <tcp-initiator>

  57:                     <remote-addresses>

  58:                         <socket-address>

  59:                             <address>localhost</address>

  60:                             <port>20001</port>

  61:                         </socket-address>

  62:                     </remote-addresses>

  63:                     <connect-timeout>2s</connect-timeout>

  64:                 </tcp-initiator>

  65:                 <outgoing-message-handler>

  66:                     <request-timeout>5s</request-timeout>

  67:                 </outgoing-message-handler>

  68:             </initiator-config>

  69:         </remote-invocation-scheme>

  70:         <!-- END: com.oracle.coherence.patterns.pushreplication -->

  71:         <proxy-scheme>

  72:             <service-name>ExtendTcpProxyService</service-name>

  73:             <acceptor-config>

  74:                 <tcp-acceptor>

  75:                     <local-address>

  76:                         <address>localhost</address>

  77:                         <port>20002</port>

  78:                     </local-address>

  79:                 </tcp-acceptor>

  80:             </acceptor-config>

  81:             <autostart>true</autostart>

  82:         </proxy-scheme>

  83:     </caching-schemes>

  84: </cache-config>

As you can see in the red-marked text from above I've:
       - set new Namespace Content Handler
       - define the new custom publisher that should work together with other publishers like: stderr and remote publishers in our case.

Step 6: Add the com.oracle.coherence.CustomPublisherScheme to your custom-pof-config file:


   1: <pof-config>

   2:     <user-type-list>

   3:         <!-- Built in types -->

   4:         <include>coherence-pof-config.xml</include>

   5:         <include>coherence-common-pof-config.xml</include>

   6:         <include>coherence-messagingpattern-pof-config.xml</include>

   7:         <include>coherence-pushreplicationpattern-pof-config.xml</include>

   8:         <!-- Application types -->

   9:         <user-type>

  10:             <type-id>1901</type-id>

  11:             <class-name>com.oracle.coherence.Order</class-name>

  12:             <serializer>

  13:                 <class-name>com.oracle.coherence.OrderSerializer</class-name>

  14:             </serializer>

  15:         </user-type>

  16:         <user-type>

  17:             <type-id>1902</type-id>

  18:             <class-name>com.oracle.coherence.CustomPublisherScheme</class-name>

  19:         </user-type>

  20:     </user-type-list>

  21: </pof-config>

CONCLUSIONSThis approach allows for publishers to publish data to almost any other receiver (database, JMS, MQ, ...). The only thing that needs to be changed is the DatabasePersistence.java class that should be adapted to the chosen receiver. Only minor changes are needed for the rest of the code (to publishBatch method from CustomPublisher class).