WSO2 ESB Feed Connector

Atom is the name of an XML-based Web content and metadata syndication format, and an application-level protocol for publishing and editing Web resources belonging to periodically updated websites. All Atom feeds must be well-formed XML documents, and are identified with the application/atom+xml media type.

To use the Feed Connector first we need to configure the Server on Backend to receive the requests from connector. (sample sever implementation Here ). for more Details.

Create New Feed

create new Feed Entry will handle by CreateFeed Operation


<feed.CreateFeed>
<HostAddress>{$ctx:HostAddress}</HostAddress>
<Title>{$ctx:Title}</Title>
<Content>{$ctx:Content}</Content>
<Author>{$ctx:Author}</Author>
<FeedID>{$ctx:FeedID}</FeedID>
</feed.CreateFeed>

Properties

  • HostAddress: URL of the Service (eg: http://localhost:9002/FeedConnector)
  • Title: Title of the feed we are creating.(eg: This is the title )
  • Content:Feed content or summery (eg: Hello World)
  • Author:owner of the feed (eg: Rajjaz)
  • FeedID:unique ID of the Feed (optional in most feed services server will handle this eg: WSO2Inc:Rajjaz,Feed)


Edit Feed

Editing the Existing Feeds by ID handle by EditFeed Operation

<feed.EditFeed>
<HostAddress>{$ctx:HostAddress}</HostAddress>
<Title>{$ctx:NewTitle}</Title>
<EntryID>{$ctx:EntryID}</EntryID>
<Content>{$ctx:Content}</Content>
<Author>{$ctx:Author}</Author>
<FeedID>{$ctx:FeedID}</FeedID>
</feed.EditFeed> 

Properties

  • HostAddress: URL of the Service (eg: http://localhost:9002/FeedConnector)
  • EntryID:Unique ID of Feed by the Feed Server
  • Title:Title of the feed we are creating.(eg: This is the New title )
  • Content:  Feed content or summery (eg: New Hello World)
  • Author: owner of the feed (eg: Rajjaz)
  • FeedID: unique ID of the Feed server (optional in most feed services server will handle this eg: WSO2Inc:Rajjaz,Feed)

Delete Feed

Delete the existing feed by id handle by DeleteFeed Operation


<feed.DeleteFeed>
<HostAddress>{$ctx:HostAddress}</HostAddress>
<EntryID>{$ctx:EntryID}</EntryID>
</feed.DeleteFeed>

Properties

  • HostAddress: URL of the Service (eg: http://localhost:9002/FeedConnector).
  • EntryID: Unique ID of Feed by the Feed Server.

    Sample configuration

    Following is a sample proxy service that illustrates how to connect to feed conector with the CreateFeedoperation.


    <?xml version="1.0" encoding="UTF-8"?>
    <proxy xmlns="http://ws.apache.org/ns/synapse" name="Feed_CreateEnttry" transports="https"
    statistics="disable" trace="disable" startOnLoad="true">
    <target>
    <inSequence onError="faultHandlerSeq">
    <property name="HostAddress" expression="//HostAddress/text()" />
    <property name="Title" expression="//Title/text()" />
    <property name="Content" expression="//Content/text()" />
    <property name="Author" expression="//Author/text()" />
    <property name="FeedID" expression="//FeedID/text()"/>
    <feed.CreateFeed>
    <HostAddress>{$ctx:HostAddress}</HostAddress>
    <Title>{$ctx:Title}</Title>
    <Content>{$ctx:Content}</Content>
    <Author>{$ctx:Author}</Author>
    <FeedID>{$ctx:FeedID}</FeedID>
    </feed.CreateFeed>
    <respond />
    </inSequence>
    <outSequence>
    <send />
    </outSequence>
    </target>
    <description />
    </proxy>

    Getting Started with Simple Apache Abdera Server

    Apache Abdera provides an implementation of the IETF Atom Syndication Format and Atom Publishing Protocol standards (RFC's 4287 and 5023). In this blog we're going to walk through how to build an Atom Publishing Protocol service server sample using Abdera's concepts of Providers and Collection Adapters. If you remember your AtomPub basics, you'll recall that AtomPub services are organized in a hierarchical way such that we have:

    • Services - A grouping of Workspaces
    • Workspace - A grouping of Collections
    • Collections - An Atom Feed which contains Atom entries. Entries can be created, deleted, updated, etc through the HTTP methods and the mapping that AtomPub defines.

      Let's start to build a simple Abdera server sample for our Testing. So create a simple java project in your IDE and add below dependencies in your pom file to provide server function and abdera libraries.

      pom.xml

      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>com.atomfeed.connector</groupId>
      <artifactId>ATOMPubServer</artifactId>
      <version>1.0.0</version>
      <packaging>jar</packaging>

      <name>ATOMPubServer</name>
      <url>http://maven.apache.org</url>

      <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      <dependencies>
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
      </dependency>
      <dependency>
      <groupId>org.mortbay.jetty</groupId>
      <artifactId>jetty-maven-plugin</artifactId>
      <version>7.4.5.v20110725</version>
      </dependency>
      <dependency>
      <groupId>org.apache.abdera</groupId>
      <artifactId>abdera-server</artifactId>
      <version>1.1.2</version>
      </dependency>
      </dependencies>
      </project>


      Let's begin our code with the setting up the URL Space since we are using jetty server for our sample we need to initialize the sever and service url for our service which we are creating.



      In this code we're setting up a Provider which contains an Atom workspace which contains our Atom collection.
      When we initialize our CollectionAdapter we call setHref. Abdera uses this to determine the URL space

      App.java


      package com.atomfeed.connector.server;

      import org.apache.abdera.protocol.server.Provider;
      import org.apache.abdera.protocol.server.impl.DefaultProvider;
      import org.apache.abdera.protocol.server.impl.SimpleWorkspaceInfo;
      import org.apache.abdera.protocol.server.servlet.AbderaServlet;
      import org.eclipse.jetty.server.Server;
      import org.eclipse.jetty.servlet.ServletContextHandler;
      import org.eclipse.jetty.servlet.ServletHolder;

      import java.io.IOException;
      import javax.servlet.ServletException;
      import javax.servlet.http.HttpServletRequest;
      import javax.servlet.http.HttpServletResponse;

      public class App {

      public static void main(String... args) throws Exception {
      int port = 9002;
      try {
      port = args.length > 0 ? Integer.parseInt(args[0]) : 9002;
      } catch (Exception e) {
      }
      Server server = new Server(port);
      ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
      ServletHolder servletHolder = new ServletHolder(new EmployeeProviderServlet());
      context.addServlet(servletHolder, "/*");
      server.start();
      server.join();
      }

      public static final class EmployeeProviderServlet extends AbderaServlet {
      @Override
      protected Provider createProvider() {
      EmployeeCollectionAdapter ca = new EmployeeCollectionAdapter();
      ca.setHref("FeedConnector");

      SimpleWorkspaceInfo wi = new SimpleWorkspaceInfo();
      wi.setTitle("Feed Update");
      wi.addCollection(ca);

      DefaultProvider provider = new DefaultProvider("/");
      provider.addWorkspace(wi);

      provider.init(getAbdera(), null);
      return provider;
      }

      @Override
      protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

      super.service(request, response);
      }

      }
      }


      Abdera comes with a class called the AbstractEntityCollectionAdapter. This class provides a simple "fill in the blanks" approach so that you can easily map your concepts to an AtomPub collection. The idea is that you have an entity class which represents each individual entry. For instance, if you were writing a blog, you would have a BlogEntry class which backs the entry. Or in our case of an employee directory, it would be an Employee class. Abdera will then provide some template methods which you fill in to give Abdera the necessary metadata.
      Our Atom Collection is going to be a simple store of employees which can be added to, deleted from, updated, etc. To get started, we must first build our Employee class:
      package com.atomfeed.connector.server;

      import java.util.Date;


      public class Employee {
      private int id;
      private String Content;
      private String title;
      private Date updated;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public String getContent() {
      return Content;
      }

      public void setContent(String Content) {
      this.Content = Content;
      }

      public Date getUpdated() {
      return updated;
      }

      public void setUpdated(Date updated) {
      this.updated = updated;
      }

      public void settitle(String title) {
      this.title = title;
      }

      public String gettitle() {

      return title;
      }
      }

      The ID is going to be used as a unique identifier for our Employee so that even if the employee's name changes, we can tie it to a previous atom entry and track the changes. The updated date will be used for our Atom feed, so that consumers know if there were changes and when they occurred. when we check the last updated date this is the information we are going to check to receive the last updated news feeds.

      The first thing we'll do is provide a hashmap to store our Employees in. Typically this will be a database or some other type of backend store. To keep things simple, we're going to store employees in a Map according to their employee id.
      Next up, we need to implement some methods which provide some basic metadata about our collection such as the feed author, the feed ID, and the feed title:
      The ID in getId(RequestContext) must be a unique identifier for your feed. The idea is that even if your feed location changes, or someone is redistributing your feed, the feed reader can use this ID to identify that two feeds are the exact same. For information on how to construct feed IDs and why they matter, its recommend that you use this article on how to create feed ids.
      Walking through this we have:
      • getId: this ID will uniquely identify your entry. It follows the same rules as the feed ID.
      • getTitle: The title of this entry.
      • getUpdated: The time this entry was last updated.
      • getAuthors: the author of this entry. It is ok to leave this empty provided that there is an author supplied for the feed.
      • getContent: The actual content of this entry that will be displayed in the feed. In this case it is just the employee name. You can return a Content or String object from this method. Using the Content object allows you to have control over whether or not the content is rendered as HTML. Alternatively you could provider a summary by overriding getSummary. You must have at least one of the getContent or getSummary methods not return null.

        Mapping Entries to Resources

        The AbstractEntityCollectionAdapter maps individual entries via a "resource name." You must provide a way to create a resource name for your entry and also provide a way to look up an entry from your resource name. In this case, our entry is an Employee.

        When we do this we must ensure that we won't have conflicts. Which means we don't want to use the employee name as the unique resource name as there may be conflicts. However we don't necessarily want to use just the employee ID either as the URL isn't very friendly. So we'll create a hybrid of the form: "EMPLOYEE_ID-EMPLOYEE_NAME".


         Note the use of the sanitizer. This will take any invalid characters for a URL and either strip or replace them from the string.
        The POST method corresponds to creating an Employee in the database. Here we're:
        • Creating a new Employee object
        • Creating a new id for it
        • Setting the name of the Employee from the Content. In a more advanced case, you woudl want to store the Employee information in an XML structure or an HTML microformat.
        • Store the Employee in our HashMap.

          The putEntry method is similar. In this case we're just updating the employee name.

          Finally, we have the deleteEntry method which gives us the resource name and allows us to remove it from the Map.

          package com.atomfeed.connector.server;

          import java.util.Arrays;
          import java.util.Date;
          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;
          import java.util.concurrent.atomic.AtomicInteger;

          import org.apache.abdera.Abdera;
          import org.apache.abdera.factory.Factory;
          import org.apache.abdera.i18n.iri.IRI;
          import org.apache.abdera.model.Content;
          import org.apache.abdera.model.Person;
          import org.apache.abdera.protocol.server.RequestContext;
          import org.apache.abdera.protocol.server.context.ResponseContextException;
          import org.apache.abdera.protocol.server.impl.AbstractEntityCollectionAdapter;


          public class EmployeeCollectionAdapter extends AbstractEntityCollectionAdapter<Employee> {
          private static final String ID_PREFIX = "tag:wso2.com,2015,feed:entry:";

          private AtomicInteger nextId = new AtomicInteger(1000);
          private Map<Integer, Employee> employees = new HashMap<Integer, Employee>();
          private Factory factory = new Abdera().getFactory();

          @Override
          public String getId(RequestContext request) {
          return "tag:wso2.com,2015,feed:entry:";
          }

          public String getTitle(RequestContext request) {
          return "Feed Connector feeds";
          }

          @Override
          public String getAuthor(RequestContext request) {
          return "WSO2 Inc";
          }

          @Override
          public Iterable<Employee> getEntries(RequestContext request) {
          return employees.values();
          }

          @Override
          public Employee getEntry(String resourceName, RequestContext request) throws ResponseContextException {
          Integer id = getIdFromResourceName(resourceName);
          return employees.get(id);
          }

          private Integer getIdFromResourceName(String resourceName) throws ResponseContextException {
          int idx = resourceName.indexOf("-");
          if (idx == -1) {
          throw new ResponseContextException(404);
          }
          return new Integer(resourceName.substring(0, idx));
          }

          @Override
          public String getName(Employee entry) {
          return entry.getId() + "-" + entry.gettitle().replaceAll(" ", "_");
          }

          @Override
          public String getId(Employee entry) {
          return ID_PREFIX + entry.getId();
          }

          @Override
          public String getTitle(Employee entry) {
          return entry.gettitle();
          }

          @Override
          public Date getUpdated(Employee entry) {
          return entry.getUpdated();
          }

          @Override
          public List<Person> getAuthors(Employee entry, RequestContext request) throws ResponseContextException {
          Person author = request.getAbdera().getFactory().newAuthor();
          author.setName("WOS2 Inc");
          return Arrays.asList(author);
          }

          @Override
          public Object getContent(Employee entry, RequestContext request) {
          Content content = factory.newContent(Content.Type.TEXT);
          content.setText(entry.gettitle());
          return content;
          }

          @Override
          public Employee postEntry(String title,
          IRI id,
          String summary,
          Date updated,
          List<Person> authors,
          Content content,
          RequestContext request) throws ResponseContextException {
          Employee employee = new Employee();
          employee.setContent(content.getText().trim());
          employee.setId(nextId.getAndIncrement());
          employee.settitle(title);
          employee.setUpdated(updated);
          employees.put(employee.getId(), employee);

          return employee;
          }

          @Override
          public void putEntry(Employee employee,
          String title,
          Date updated,
          List<Person> authors,
          String summary,
          Content content,
          RequestContext request) throws ResponseContextException {
          employee.settitle(content.getText().trim());
          }

          @Override
          public void deleteEntry(String resourceName, RequestContext request) throws ResponseContextException {
          Integer id = getIdFromResourceName(resourceName);
          employees.remove(id);
          }
          }


          After complete  the development just run the application as java application you will see the console output as


          2015-10-19 11:33:16.825:INFO::jetty-7.4.5.v20110725
          2015-10-19 11:33:16.862:INFO::started o.e.j.s.ServletContextHandler{/,null}
          2015-10-19 11:33:16.985:INFO::Started SelectChannelConnector@0.0.0.0:9002 STARTING



          Later we can check the the output for this server after build a client for this sever now you can download full source code and can contribute for the further development of this Source Code

          References

          Getting Started with Simple WSO2 ESB Custom Inbound Endpoint

          WSO2 ESB supports several inbound endpoints, but there can be scenarios that require functionality not provided by the existing inbound endpoints. For example, you might need an inbound endpoint to connect to a certain back-end server or vendor specific protocol.

          To support such scenarios, you can write your own custom inbound endpoint by further extending inbound polling or inbound listening. let's Start to build a simple sample Custom Inbound endpoint .

          you can download sample source code from here

          below one is simple custom inbound skeleton we can implement our own inbound top of this code.


          package org.wso2.carbon.inbound.custom.poll;

          import java.util.Properties;

          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.apache.synapse.core.SynapseEnvironment;
          import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericPollingConsumer;

          public class SamplePollingClient extends GenericPollingConsumer{

          private static final Log log = LogFactory.getLog(SamplePollingClient.class);

          /**
          * @param properties
          * @param name
          * @param synapseEnvironment
          * @param scanInterval
          * @param injectingSeq
          * @param onErrorSeq
          * @param coordination
          * @param sequential
          */
          public SamplePollingClient(Properties properties, String name,
          SynapseEnvironment synapseEnvironment, long scanInterval,
          String injectingSeq, String onErrorSeq, boolean coordination,
          boolean sequential) {
          super(properties, name, synapseEnvironment, scanInterval, injectingSeq, onErrorSeq, 
                           coordination, sequential);
          log.info("Initialized the custom polling consumer.");
          }

          @Override
          public Object poll() {
          //TODO need to implement the logic here
          log.info("Inside the execute method.");
          return null;
          }

          /**
          * Stopping the inbound endpoint
          */
          public void destroy() {
          //TODO need to implement the logic here
          log.info("Inside the destroy method, destroying the polling inbound ...");
          }
          }

          after download the source code Now, Build the code you will get the .Jar file copy file and paste it into dropins folder(/../../wso2esb-4.9.0/repository/components/dropins). then Restart the ESB

          SamplePollingClient constructor will initialize and setup the running environment and this is the method to set pre-requirements for our custom inbound.

          poll is the method we will implement our  logic and requirement we have to do. inbound will call this method is a given time interval.

          destroy is the method will contain the logic’s and functions to clean and free the resources which inbound used while its in active (clean the Registry, etc) 

          Sample Configuration through Proxy

          <inboundEndpoint name="class" sequence="request" onError="fault"
          class="org.wso2.carbon.inbound.custom.poll.SamplePollingClient" suspend="false">
          <parameters>
          <parameter name="sequential">true</parameter>
          <parameter name="interval">2000</parameter>
          <parameter name="coordination">true</parameter>
          </parameters>
          </inboundEndpoint>


          Output
          ===================
          Initialized the custom polling consumer.
          Inside the execute method.
          Inside the execute method.
          Inside the execute method.
          Inside the execute method.
          Inside the execute method.
          .................
          .....
          ...
               
           
          when destroying the inbound

          Inside the destroy method, destroying the polling inbound ...

          References

          WSO2 ESB Inbound Endpoint

          Inbound Endpoint is an endpoint which resides in server side of the ESB. Those are introduced to achieve  some of the main features like multi tenancy support for some protocols like JMS, by passing axis2 layer in  Inbound side,deploying and undeploying inbound endpoints without server restarting. 
           Basically there are two  behaviours introduced. Selection of the behaviour is done according to protocol.
          • Polling behaviour
          • Listening behaviour    

          Polling behaviour
                 This behaviour is in  JMS, File and Generic Inbound Endpoints. Basically it polls periodically for data with some Conditions and if Conditions is ok then injects that data to given sequence. for example in JMS inbound Endpoint it checks for JMS Queue periodically whether the data is available if so then take that message and inject it to given sequence.So this kind of inbound endpoints have one way operation. It does not require a result so in nature these are asynchronous.

          Listening behaviour     
                This  behaviour is in HTTP and CXF-RS inbound endpoints. Basically listener is started in given port and it listens for requests coming to that port. When request is available it injects that request to the given sequence.Here those have two way operation and in nature those are synchronous.

          Inbound Endpoint Configuration  

          <inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                          name="feedep"
                          sequence="TestIn"
                          onError="fault"
                          protocol="http"
                          suspend="false">
            <parameters>
               <parameter name="inbound.http.port">8085</parameter>
            </parameters>
          </inboundEndpoint>

          According to the above configuration generic parameters for  inbound endpoints are added as attributes in inboundEndpoint element and protocol specific  parameters are given in the parameters element. 
           
          Thread Pools in InboundEndpoint

              We need threadpool  to execute message injectors to the sequence. Message Injectors are handle by thread pool which avoids blocking in the message consumers from the source or listeners for the source.
          There are three options that can be taken.
          • Per inbound thread pool.
          • Protocol specific thread pool.
          • One thread pool for all inbound endpoints.
          currently we are having thread pool per inbound and we need to make it configurable.

          Multi Tenancy in InboundEndpoint                   
              Multi Tenancy means more tenants are operated in single physical location with logically separated manner. So each tenant can have more inbound endpoints. so when introducing this there are major problems that we need to solved. Basically one way protocols like JMS, File and Generic are run top of the task since tasks are handling multi tenancy  we don't need to separately handle it.
            But in Listening  endpoints like Http and CXF-RS need to handle multi tenancy. So basic problem that we faced is port sharing and  tenant unloading. we need to do port sharing between tenants because number of ports are limited.When tenant loading and unloading synapse environment also get loaded and unloaded. So we keep common handler and Listener always running and in runtime we get the synapse environment and get inbound endpoints through it and do the relevant operations.

          Clustering    
            This is achieved by using hazelcast framework and distributed locks.  


          Reference