riff is for functions

riff is for functions

  • Docs
  • Blog
  • GitHub
  • Slack

›Runtimes

Version (v0.6 snapshot)

  • v0.5.x
  • v0.4.x
  • v0.3.x

Getting Started

  • Pick your environment
  • GKE
  • Minikube
  • Docker for Mac
  • Docker for Windows
  • kind

Runtimes

  • Core
  • Knative
  • Streaming

Function Invokers

  • Java
  • Node.js
  • Command

CLI Reference

  • riff
  • riff doctor
  • riff completion
  • Credential

    • riff credential
    • riff credential apply
    • riff credential delete
    • riff credential list

    Build

    • riff function
    • riff function create
    • riff function delete
    • riff function list
    • riff function status
    • riff function tail
    • riff application
    • riff application create
    • riff application delete
    • riff application list
    • riff application status
    • riff application tail
    • riff container
    • riff container create
    • riff container delete
    • riff container list
    • riff container status

    Knative Runtime

    • riff knative
    • riff knative adapter
    • riff knative adapter create
    • riff knative adapter delete
    • riff knative adapter list
    • riff knative adapter status
    • riff knative deployer
    • riff knative deployer create
    • riff knative deployer delete
    • riff knative deployer list
    • riff knative deployer status
    • riff knative deployer tail

    Core Runtime

    • riff core
    • riff core deployer
    • riff core deployer create
    • riff core deployer delete
    • riff core deployer list
    • riff core deployer status
    • riff core deployer tail

    Streaming Runtime

    • riff streaming
    • riff streaming stream
    • riff streaming stream create
    • riff streaming stream delete
    • riff streaming stream list
    • riff streaming stream status
    • riff streaming processor
    • riff streaming processor create
    • riff streaming processor delete
    • riff streaming processor list
    • riff streaming processor status
    • riff streaming processor tail
    • riff streaming gateway
    • riff streaming gateway list
    • riff streaming gateway status

    In-memory Gateway

    • inmemory-gateway
    • inmemory-gateway create
    • inmemory-gateway delete
    • inmemory-gateway list
    • inmemory-gateway status

    Kafka Gateway

    • kafka-gateway
    • kafka-gateway create
    • kafka-gateway delete
    • kafka-gateway list
    • kafka-gateway status

    Pulsar Gateway

    • pulsar-gateway
    • pulsar-gateway create
    • pulsar-gateway delete
    • pulsar-gateway list
    • pulsar-gateway status
Edit

Streaming Runtime

The streaming runtime allows execution of functions on streams of messages, permitting more complex interactions than the simple request / reply used by the core or knative runtimes.

Install

The Streaming runtime is not installed with riff by default. See the getting started guide for how to install the prerequisits and riff Build in your Kubernetes environment.

You can then install the Streaming runtime and it's dependencies using the following:

kapp deploy -n apps -a keda -f https://storage.googleapis.com/projectriff/release/0.6.0-snapshot/keda.yaml
kapp deploy -n apps -a riff-streaming-runtime -f https://storage.googleapis.com/projectriff/release/0.6.0-snapshot/riff-streaming-runtime.yaml

NOTE: Not all invokers support streaming. Invokers conforming to the full invoker specification can be used with the streaming runtimes, while some can't. In particular, the command invoker does not support streaming.

Gateways

When using the streaming runtime, messages flow between functions using streams, which are backed by some concrete messaging system, such as Kafka. Gateways abstract away the protocol details of concrete messaging systems (riff uses liiklus to that effect), as well as the way to provision topics/queues corresponding to each stream.

Gateways are namespaced resources and manage streams in their own namespace. They typically target their own messaging system, but are implemented in such a way that two different gateway instances could use the same broker (e.g. the same Kafka cluster) without interference. Conversely, functions can interact with streams managed by two (or more) different gateways.

The reconciliation of some gateways is taken care of by the riff streaming runtime, while it is expected that extension gateways may be handled by custom controllers. The configuration needed by each kind of gateway varies greatly depending on the backing message broker. For both of these reasons, creation of an instance of a gateway is not carried out via the riff CLI, but rather "manually" using yaml files. Additionally, the installation of concrete message brokers (inside or outside the cluster) is out of scope of this document.

KafkaGateway

As of riff 0.5.x, one kind of Gateway available is KafkaGateway, which maps each riff stream to a Kafka topic.

If you don't have Kafka installed in your cluster you can create a single node Kafka install using Helm 3 (docs) and the Helm incubator chart for Apache Kafka. To install Helm 3 follow the instructions in the Helm GitHub repo. Once you have Helm 3 installed you can run the following commands to install a single node Kafka deployment:

helm repo add incubator https://storage.googleapis.com/kubernetes-charts-incubator
kubectl create namespace kafka
helm install kafka --namespace kafka incubator/kafka --set replicas=1 --set zookeeper.replicaCount=1 --wait

The easiest way to create a KafkaGateway is using the riff CLI:

riff streaming kafka-gateway create franz --bootstrap-servers kafka.kafka:9092 --tail
Created kafka gateway "franz"
Waiting for kafka gateway "franz" to become ready...
...
KafkaGateway "franz" is ready

Streams

Streams are namespaced resources that allow the flow of (and typically persist) messages serialized by riff streaming. Each stream has a content-type assigned to it and only messages compatible with that MIME type are allowed on that stream.

To declare a stream (and maybe provision any backing resources in the concrete message broker supporting it), use the riff CLI and specify the address of the provisioner service of the gateway to use.

Building on the example above, here is how to create two streams, named in and out respectively, both managed by the franz gateway:

riff streaming stream create in  --gateway franz --content-type application/json
riff streaming stream create out --gateway franz --content-type application/json

Processors

Processors are the glue between streams and functions. An instance of a processor tells the streaming runtime that a given function should react to messages flowing on its input stream(s) and forward results to its output stream(s).

Upon creation of a processor, a deployment is created that hosts both the function (with its dedicated invoker) and a sidecar container running the streaming processor.

The role of the sidecar is to connect to each stream, using a reactive API and invoke the function using the riff streaming rpc protocol. The function is invoked once per window. It is the responsibility of the streaming processor sidecar to chop inputs into windows.

NOTE: The windowing function implemented by the streaming processor is currently hardcoded to create windows every minute.

Here is an example processor using a time-averager function that emits an average of the input numbers computed every 10 seconds:

riff function create time-averager \
  --git-repo https://github.com/projectriff-samples/time-averager.git \
  --handler com.acme.TimeAverager \
  --tail
riff streaming processor create time-averager \
  --function-ref time-averager \
  --input in \
  --output out

Testing with dev-utils

The dev-utils container offers CLI utilities which can be used to publish and subscribe to messages.

To run dev-utils in a pod called riff-dev in the default namespace:

kubectl create serviceaccount riff-dev
kubectl create rolebinding riff-dev-edit --clusterrole=edit --serviceaccount=default:riff-dev
kubectl run riff-dev --serviceaccount=riff-dev --generator=run-pod/v1 --image=projectriff/dev-utils

Using separate terminal windows, subscribe to the in and out streams to observe messages on those streams.

kubectl exec riff-dev -it -- subscribe in --payload-encoding raw
kubectl exec riff-dev -it -- subscribe out --payload-encoding raw

Publish numbers to the in stream.

kubectl exec riff-dev -it -- publish in --content-type application/json --payload 10
kubectl exec riff-dev -it -- publish in --content-type application/json --payload 100
kubectl exec riff-dev -it -- publish in --content-type application/json --payload 2

The subscriber on the out stream should show the resulting averages. (Your results may vary depending on window boundaries.)

$ kubectl exec riff-dev -it -- subscribe out --payload-encoding raw
{"payload":"55.0","contentType":"application/json","headers":{}}
{"payload":"2.0","contentType":"application/json","headers":{}}
← KnativeJava →
  • Install
  • Gateways
    • KafkaGateway
  • Streams
  • Processors
  • Testing with dev-utils
riff is for functions
Docs
Versions
Community
BlogGitHubSlackKnativeTwitter
More
Privacy PolicyTerms of UseCode of Conduct
Deployed by Netlify
Copyright © 2021 VMware, Inc