Java program for posting messages to Weblogic JMS (Java Messaging Service) Queue

Java

Video is ready, Click Here to View ×


Java program for posting messages to Weblogic JMS (Java Messaging Service) Queue

9 thoughts on “Java program for posting messages to Weblogic JMS (Java Messaging Service) Queue

  1. How to consume messages from queue.

    – we have one webLogic queue, which has 8 nodes clustered.

    – Source application sends the message to webLogic queue.

    – Using some algorithm webLogic places the messages in 8 different nodes.

    – each node url like t3://localhost1:7003; t3://localhost2:7003,… t3://localhost8:7003

    – In client, we have developed 8 java program, each java program is replica of same program, except one difference. (using onMessage method to consumer message)

    program1.java try to connect t3://localhost1:7003, program2.java to t3://localhost2:7003….program8.java tp t3://localhost8:7003

    – Here is the 1st problem: These t3 urls are clustered URL, even program1 try to connect host 1, there is no guarantee that it will connect to host1

    – server side has clustered 8 nodes

    – client side is having 8 different java program each try to connect 1 UNIQUE node.

    – These 8 programs are "single process and single thread".

    – First challenge is, I need to connect 8 client consumer program to 8 different nodes evenly

    – With in a minute I can start all 8 programs, but the result is, instead of obtaining connection with each node, only few nodes are connected by client programs

    – Example: node1 can get 3 connection, node2 can get 1, node4 can get 4 rest all node no connection.

    – I have solved this problem by starting thread one by one with 3 min interval ( This is not recommended or good practice, but problem solved )

    – Now source application send more message, need to start more consumers. My plan is to have 2 connection per node.

    – but when I try to start 2nd set of 8 programa with same 3 min interval, the clients are not getting even connection.

    – some node will have uneven number of connections others will left with only one connection

    – How we can get even number of connection

  2. here is the QueuePoster class, if anyone needs it:

    package com.ismaelOmar.weblogic;

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Hashtable;

    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;

    public class QueuePoster {
    public final static String SERVER="t3://localhost:7001";
    public final static String JNDI_FACTORY="weblogic.jndi.WLInitialContextFactory";
    public final static String JMS_FACTORY="com/Ismael/weblogic/base/cf"; //Change this
    public final static String QUEUE="com/Ismael/weblogic/base/dq"; //Change this

    private QueueConnectionFactory queueConnectionFactory;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private QueueSender queueSender;
    private Queue queue;
    private TextMessage message;

    public void init(Context context, String queueName) throws NamingException, JMSException{
    queueConnectionFactory = (QueueConnectionFactory) context.lookup(JMS_FACTORY);
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    queue = (Queue) context.lookup(queueName);
    queueSender = queueSession.createSender(queue);
    message = queueSession.createTextMessage();
    queueConnection.start();
    }

    public void post(String msg) throws JMSException {
    message.setText(msg);
    queueSender.send(message);

    }

    public void close() throws JMSException{
    queueSender.close();
    queueSession.close();
    queueConnection.close();
    }

    private static void sendToServer (QueuePoster queuePoster) throws IOException, JMSException {
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    boolean readFlag=true;
    System.out.println("Enter messages to send to Weblogic server (Enter quit to end):");
    while(readFlag) {
    System.out.println("Enter Message:");
    String msg=bufferedReader.readLine();
    if(msg.equals("quit")) {
    queuePoster.post(msg);
    System.exit(0);
    }
    queuePoster.post(msg);
    System.out.println();

    }
    }

    private static InitialContext getInitialContext() throws NamingException
    {
    Hashtable<String, String> env = new Hashtable<String, String>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
    env.put(Context.PROVIDER_URL, SERVER);
    return new InitialContext(env);
    }

    public static void main(String[] args) throws Exception{
    InitialContext initialContext = getInitialContext();
    QueuePoster queuePoster = new QueuePoster();
    queuePoster.init(initialContext, QUEUE);
    sendToServer(queuePoster);
    queuePoster.close();
    }

    }

  3. package com.ghoshalacademy.weblogic;

    import java.util.Hashtable;

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueReceiver;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;

    public class QueueReader implements MessageListener{

    public final static String SERVER="t3://localhost:7001";
    public final static String JNDI_FACTORY="weblogic.jndi.WLInitialContextFactory";
    public final static String JMS_FACTORY="com.ghoshal.weblogic.base.cf";
    public final static String QUEUE="com.ghoshal.weblogic.base.dq";

    private QueueConnectionFactory queueConnectionFactory;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private QueueReceiver queueReceiver;
    private Queue queue;
    private Boolean quit=false;

    public void init(Context context, String queueName) throws NamingException, JMSException {
    queueConnectionFactory = (QueueConnectionFactory) context.lookup(JMS_FACTORY);
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    queue = (Queue) context.lookup(queueName);
    queueReceiver = queueSession.createReceiver(queue);
    queueReceiver.setMessageListener(this);
    queueConnection.start();
    }

    public void close() throws JMSException {
    queueReceiver.close();
    queueSession.close();
    queueConnection.close();
    }

    private static InitialContext getInitialContext() throws NamingException
    {
    Hashtable<String, String> env = new Hashtable<String, String>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
    env.put(Context.PROVIDER_URL, SERVER);
    return new InitialContext(env);
    }

    public static void main(String[] args) throws Exception {
    InitialContext initialContext = getInitialContext();
    QueueReader queueReader = new QueueReader();
    queueReader.init(initialContext, QUEUE);
    System.out.println("Waiting to receive messages");
    synchronized(queueReader){
    while(!queueReader.quit) {
    try {
    queueReader.wait();
    }catch(InterruptedException ie){}
    }
    queueReader.close();
    }
    }

    @Override
    public void onMessage(Message msg) {
    try {
    String msgText;
    if (msg instanceof TextMessage) {
    msgText = ((TextMessage)msg).getText();
    } else {
    msgText = msg.toString();
    }

    System.out.println("Message Received: "+ msgText );

    if (msgText.equalsIgnoreCase("quit")) {
    synchronized(this) {
    quit = true;
    this.notifyAll(); // Notify main thread to quit
    }
    }
    } catch (JMSException jmsException) {
    System.err.println("Exception: "+jmsException.getMessage());
    }
    }
    }

  4. package com.ghoshalacademy.weblogic;

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Hashtable;

    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;

    public class QueuePoster {

    public final static String SERVER="t3://localhost:7001";
    public final static String JNDI_FACTORY="weblogic.jndi.WLInitialContextFactory";
    public final static String JMS_FACTORY="com.ghoshal.weblogic.base.cf";
    public final static String QUEUE="com.ghoshal.weblogic.base.dq";

    private QueueConnectionFactory queueConnectionFactory;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private QueueSender queueSender;
    private Queue queue;
    private TextMessage message;

    public void init(Context context, String queueName) throws NamingException, JMSException {
    queueConnectionFactory = (QueueConnectionFactory) context.lookup(JMS_FACTORY);
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    queue = (Queue) context.lookup(queueName);
    queueSender = queueSession.createSender(queue);
    message = queueSession.createTextMessage();
    queueConnection.start();
    }

    public void post(String msg) throws JMSException {
    message.setText(msg);
    queueSender.send(message);
    }

    public void close() throws JMSException {
    queueSender.close();
    queueSession.close();
    queueConnection.close();
    }

    private static void sendToServer(QueuePoster queuePoster) throws IOException, JMSException {
    BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in));
    boolean readFlag=true;
    System.out.println("Enter messages to send to weblogic server (Enter quit to end):n");
    while(readFlag) {
    System.out.print("Enter Message:");
    String msg=bufferedReader.readLine();
    if(msg.equals("quit")) {
    queuePoster.post(msg);
    System.exit(0);
    }
    queuePoster.post(msg);
    System.out.println();
    }
    bufferedReader.close();
    }

    private static InitialContext getInitialContext() throws NamingException
    {
    Hashtable<String, String> env = new Hashtable<String, String>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
    env.put(Context.PROVIDER_URL, SERVER);
    return new InitialContext(env);
    }

    public static void main(String[] args) throws Exception {
    InitialContext initialContext = getInitialContext();
    QueuePoster queuePoster = new QueuePoster();
    queuePoster.init(initialContext, QUEUE);
    sendToServer(queuePoster);
    queuePoster.close();
    }

    }

Leave a Reply

Your email address will not be published. Required fields are marked *