import java.io.*; import java.net.*; import java.util.*; import com.streambase.sb.*; import com.streambase.sb.adapter.*; import com.streambase.sb.operator.*; /** * This output adapter opens an HTTP port to listen for client connections, * and whenever it receives a tuple it emits a server-side-event to all * connected clients. * * See http://whatwg.org/specs/web-apps/current-work/#scs-server-sent * for more information on server-side-events. */ public class SSE extends Adapter implements OutputAdapter, Parameterizable { //// Properties private int _port = 9999; public int getPort() { return _port; } public void setPort(int port) { _port = port; } private String _defaultEvent = "event"; public String getDefaultEvent() { return _defaultEvent; } public void setDefaultEvent(String e) { _defaultEvent = e; } //// Adapter interface public SSE() { setPortHints(1, 0); } public void typecheck() throws TypecheckException { requireInputPortCount(1); if (_port <= 0 || _port >= 65536) throw new TypecheckException("Invalid port: " + _port); } public void init() throws StreamBaseException { super.init(); listenForConnections(); } public void processTuple(int port, Tuple tuple) throws StreamBaseException { try { emitEvent(tuple); } catch (Exception t) { t.printStackTrace(); } } //// Implementation private List connections = new ArrayList(); private void listenForConnections() throws StreamBaseException { connections.add(System.out); registerRunnable(new Runnable() { public void run() { try { ServerSocket serverSocket = new ServerSocket(_port); while (shouldRun()) { Socket socket = serverSocket.accept(); OutputStream os = socket.getOutputStream(); // send HTTP headers PrintStream ps = new PrintStream(os); ps.print("HTTP/1.1 200 OK\r\n"); ps.print("Cache-Control: no-cache\r\n"); ps.print("Pragma: no-cache\r\n"); ps.print("Content-Type: application/x-dom-event-stream\r\n"); ps.print("\r\n"); synchronized (connections) { connections.add(os); } } } catch (IOException e) { // TODO: shut down the server e.printStackTrace(); } } }); } private void emitEvent(Tuple tuple) { // let the Event field default to the name of the adapter String event = _defaultEvent; try { event = tuple.getString("Event"); } catch (StreamBaseException e) { /* ignore */ } writeKeyValuePair("Event", event); Schema.Field fields[] = getInputSchema(0).getFields(); for (int i=0; i < fields.length; i++) { Schema.Field field = fields[i]; if (field.getName().equals("Event")) continue; Object val = tuple.getField(field); String value = (val==null) ? "null" : val.toString(); writeKeyValuePair(field.getName(), value); } writeText("\n"); } private void writeKeyValuePair(String key, String value) { writeText(key); writeText(": "); writeText(value); writeText("\n"); } private void writeText(String text) { synchronized(connections) { for (OutputStream os : connections) { try { if (text == null) text = "null"; os.write(text.getBytes()); } catch (IOException e) { // TODO: remove connection from the list } } } } }