# Graphql::Streaming Experimental tools for GraphQL over a long-lived connection, including `subscription`, `@defer` and `@stream`. ([demo](https://github.com/rmosolgo/graphql-ruby-stream-defer-demo)) ## Installation ```ruby # Requires an experimental branch of graphql: gem "graphql", github: "rmosolgo/graphql-ruby", branch: "defer-directive" gem "graphql-streaming" ``` ## Usage ### Defer & Stream This gem supports [queries with `@defer` and `@stream`](https://youtu.be/ViXL0YQnioU?t=12m49s). - Use `GraphQL::Execution::DeferredExecution`: ```ruby # only available in the defer-directive branch: MySchema.query_execution_strategy = GraphQL::Execution::DeferredExecution MySchema.subscription_execution_strategy = GraphQL::Execution::DeferredExecution ``` - Choose a transport ([HTTP Streaming](#http-streaming-transport) or [ActionCable](#actioncable-transport)) and get its client (both built-in clients depend on `Object.assign`) #### HTTP Streaming Transport `StreamCollector` uses `stream.write(patch)` to send chunked responses to the client. For example, you can use it with [`ActionController::Live`](http://api.rubyonrails.org/classes/ActionController/Live.html). Create a `collector` and include it in the query as `context[:collector]`: ```ruby class ChunkedGraphqlsController < ApplicationController include ActionController::Live def create # ... # initialize the collector with `response.stream` context = { collector: StreamCollector.new(response.stream) } Schema.execute(query_string, variables: variables, context: context) # close the stream when the query is done: response.stream.close end end ``` From JavaScript, use `StreamingGraphQLClient` to fetch data: ```js //= require graphql-streaming/streaming_graphql_client onResponse = function(response) { // Handle response.errors / response.data } StreamingGraphQLClient.fetch( "/graphql/", `query getPost($postId: Int!){ post(id: $postId) { title comments @stream { body } } }`, {postId: 1}, onResponse, ) ``` The `onResponse` function will be called with GraphQL response after each patch is added. #### ActionCable Transport You can use [Rails 5's `ActionCable`](http://edgeguides.rubyonrails.org/action_cable_overview.html) to send and receive GraphQL. In your channel, implement an action called `#fetch(data)` for executing GraphQL queries. It should add an `ActionCableCollector` as `context[:collector]`, using `data["query_id"]` from the client. For example: ```ruby class GraphqlChannel < ApplicationCable::Channel # ... def fetch(data) query_string = data["query"] variables = ensure_hash(data["variables"] || {}) context = {} # Get the query ID, which is added by the GraphQLChannel client query_id = data["query_id"] # Get a broadcaster, which is the target for patches broadcaster = ActionCable.server.broadcaster_for(channel_name) # The collector passes patches on to the broadcaster context[:collector] = GraphQL::Streaming::ActionCableCollector.new(query_id, broadcaster) # Run the query Schema.execute(query_string, variables: variables, context: context) # Tell the client to stop listening for patches context[:collector].close end end ``` Then, create a `GraphQLChannel` to make requests. `GraphQLChannel.subscription` contains defaults for `App.cable.subscription.create`: ```js //= require graphql-streaming/graphql_channel App.graphqlChannel = App.cable.subscriptions.create("GraphqlChannel", GraphQLChannel.subscription) // OPTIONAL forward log messages to console.log: // GraphQLChannel.log = console.log.bind(console) ``` And you can provide overrides if you want: ```js // Trigger `graphql-channel:ready` when the channel is connected App.graphqlChannel = App.cable.subscriptions.create( "GraphqlChannel", Object.assign(GraphQLChannel.subscription, { connected: function() { $(document).trigger("graphql-channel:ready") }, }) ) ``` Send queries with `graphqlChannel.fetch`: ```js var queryString = "{ ... }" var queryVariables = { /* ... */ } var onResponse = function(response) { /* handle response.errors & response.data */} App.graphqlChannel.fetch(queryString, queryVariables, onResponse) ``` The `onResponse` handler will be called with the whole response _each time_ a patch is received. ### Subscription `ActionCableSubscriber` uses `ActionCable` as a backend for GraphQL subscriptions. There are three parts: - Send a __subscriber__ along with your query - Define a __Subscription type__ which registers subscriptions during resolve - Make __triggers__ from application code #### Subscriber The subscriber rides along with your query (as `context[:subscriber]`). It listens for triggers from the application, and when they happen, it re-evaluates the query and pushes an update over its channel. ```ruby # Get the query ID from the client query_id = data["query_id"] context = { current_user: user_from_session(session) } context[:subscriber] = GraphQL::Streaming::ActionCableSubscriber.new(self, query_id) do # This block is called when a matching trigger occurs context[:current_user].reload Schema.execute(query_string, variables: variables, context: context) end Schema.execute(query_string, variables: variables, context: context) # If there are no outstanding subscriptions, # tell the client to stop listening for patches if !context[:subscriber].subscribed? context[:collector].close end ``` #### Subscription Type `SubscriptionType` is a plain `GraphQL::ObjectType`, but its fields are special. They correspond to application triggers. When a trigger is fired, any subscriber who is listening to the corresponding field will re-evaluate its query. Define subscription fields with `subscription`: ```ruby SubscriptionType = GraphQL::ObjectType.define do name "Subscription" subscription :post, PostType do argument :id, !types.Int resolve -> (obj, args, ctx) { Post.find(args[:id]) } end end MySchema = GraphQL::Schema.new(subscription: SubscriptionType ...) ``` #### Triggers From your application, you can trigger events on subscription fields. For example, to tell clients that a Post with a given ID changed: ```ruby class Post def after_commit GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: id}) end end ``` The arguments passed to `.trigger` will be tested against field arguments. Any subscribers who requested a matching query will be updated. For example: ```graphql subscription { post(id: 1) { ... postFields } } ``` would be updated by ```ruby GraphQL::Streaming::ActionCableSubscriber.trigger(:post, {id: 1}) ``` ## Development - `bundle exec rake test` to run the tests - Sadly, no JS tests! See the [demo repo](https://github.com/rmosolgo/graphql-ruby-stream-defer-demo) for poke-around testing ## TODO - What happens to subscriptions when you redeploy or ActionCable loses its connection? Need to handle reconnecting in some way. - Handle errors in subscriber block - Improve middleware so you don't have to manually close `ActionCableCollector`s - Tests for JS? - Other platforms (Pusher, HTTP/2)? - Public alternative to `@channel.send(:transmit, payload)`? ## License The gem is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).