In the previous proposal, we proposed to introduce a framework for OpenFunction to convert "functions" to "applications" - the functions-framework. The functions-framework did this well in OpenFunction v0.4.0 and earlier.
However, we need to add some necessary capabilities to function because OpenFunction is evolving and the demand from the community is increasing. We can use functions-framework to implement most of these new features, thanks to the uniqueness of functions-framework in the lifecycle of OpenFunction.
Function tracing capabilities are very typical of these features.
We need to add observability to function, which helps to observe and track the operation of functions in large scale scenarios.
Referring to #7, we can take the form of a plug-in in the function-frame to wake up the observability component to run at a specific timing.
For example, in functions-framework-go, we can add a plugin hook before and after the function is run, and run the logic related to the observability component in the hook.
Reference this:
func registerHTTPFunction(path string, fn func(http.ResponseWriter, *http.Request), h *http.ServeMux) error {
h.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
defer recoverPanicHTTP(w, "Function panic")
// execute pre-run plugins
fn(w, r)
// execute post-run plugins
})
return nil
}
This is just a simple demonstration and I will follow up with how to refactor the functions-framework so that it can achieve this goal.
- OpenFunction#146
- OpenFunction/functions-framework#8
- OpenFunction/functions-framework#9
- OpenFunction/functions-framework#12
- design a common interface in functions-framework-go to support the tracing capabilities of SkyAPM/Go2Sky and OpenTelemetry
- improve functions-framework-nodejs (to support observability capabilities)
- improve builder (rewrite the builder to make it better support functions-framework)
I'll use function-framework-go
as an example to illustrate how functions-framework works in this proposal:
|-- framework
| `-- framework.go
|
|-- context
| `-- context.go
|
|-- runtime
| |-- async
| | `-- async.go
| |-- knative
| | `-- knative.go
| `-- interface.go
|
`-- plugins
|-- plugin-a
| `-- plugin-a.go
|-- plugin-b
| `-- plugin-b.go
`-- interface.go
framework: Contains logic for handling the lifecycle of the conversion of function to application.
context: Contains logic for handling OpenFunction Context related matters.
runtime: Contains logic for handling function conversions on different serving runtimes.
plugin: Contains logic for handling plugins. Stores the default plugins.
Responsible for handling the lifecycle of the function conversion logic, which provides the following capabilities:
- Register functions and associate user functions to runtime
- Scan and register user-defined plugins and the default plugins
- Fetch the plugins and call the plugin interface to execute the plugins
- Execute pre-phase plugins before function execution
- Execute post-phase plugins after function execution
- Start the runtime
We use a structure called functionsFrameworkImpl
to handle the above logic.
type functionsFrameworkImpl struct {
ofContext ofctx.Context
prePlugins []plugin.Plugin
postPlugins []plugin.Plugin
runtime runtime.Interface
}
functionsFrameworkImpl
structure is responsible for storing context information (ofContext), extension or plugin information (prePlugins, postPlugins) during function conversion lifecycle.
The following steps need to be completed during the initialization phase of functionsFrameworkImpl
:
- Parse OpenFunction Context
- Scan the local directory and register the plugins if exist. Register the framework default plugins under
plugin
directory - Create runtime
func InitFramework() *functionsFrameworkImpl {
fwk := &functionsFrameworkImpl{}
// Parse OpenFunction Context
if err := parseOpenFunctionContext(fwk); err != nil {
klog.Errorf("failed to get OpenFunction Context: %v\n", err)
return nil
}
// Scan the local directory and register the plugins if exist
// Register the framework default plugins under `plugin` directory
scanAndRegisterPlugins(fwk)
// Create runtime
if err := createRuntime(fwk); err != nil {
klog.Errorf("failed to create runtime: %v\n", err)
return nil
}
return fwk
}
functionsFrameworkImpl
contains the following key functions:
Register()
is used to register user functionprocessPreHooks()
is used to execute pre-phase hooksprocessPostHooks()
is used to execute post-phase hooksStart()
is used to start runtime
func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{}) error {
if fnHTTP, ok := fn.(func(http.ResponseWriter, *http.Request) error); ok {
if err := fwk.runtime.RegisterHTTPFunction(fwk.ofContext, fwk.processPreHooks, fwk.processPostHooks, fnHTTP); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
} else if fnOpenFunction, ok := fn.(func(ctx ofctx.Context) (ofctx.Out, error)); ok {
if err := fwk.runtime.RegisterOpenFunction(fwk.ofContext, fwk.processPreHooks, fwk.processPostHooks, fnOpenFunction); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
} else if fnCloudEvent, ok := fn.(func(context.Context, cloudevents.Event) error); ok {
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.ofContext, fwk.processPreHooks, fwk.processPostHooks, fnCloudEvent); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
} else {
err := errors.New("unrecognized function")
klog.Errorf("failed to register function: %v", err)
return err
}
return nil
}
func (fwk *functionsFrameworkImpl) processPreHooks() error {
for _, plg := range fwk.prePlugins {
if err := plg.ExecPreHook(fwk.ofContext); err != nil {
klog.Errorf("failed to exec plugin %s: %s", plg.Name(), err.Error())
}
}
return nil
}
func (fwk *functionsFrameworkImpl) processPostHooks() error {
for _, plg := range fwk.postPlugins {
if err := plg.ExecPostHook(fwk.ofContext); err != nil {
klog.Errorf("failed to exec plugin %s: %s", plg.Name(), err.Error())
}
}
return nil
}
func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
err := fwk.runtime.Start(ctx)
if err != nil {
klog.Error("failed to start runtime service")
return err
}
return nil
}
So in the "Application Template" we need to call functionsFrameworkImpl.Register()
to complete the registration of the user function, and then call functionsFrameworkImpl.Start()
to start the runtime. (This is almost identical to the current process. I will talk about it in a later section.)
Responsible for maintaining the OpenFunction Context structure, which provides the following capabilities:
- Parse the OpenFunction Context
- Dock to the function input source and direct the input data to the function
- Dock to the function output target and direct the function data to the output target
- Provide a list of plugins (in order) for the pre-phase
- Provide a list of plugins (in order) for the post-phase
- Encapsulate the return of function
- Encapsulate CloudEvents, Dapr bindings events, Dapr pubsub events (CloudEvents-based) format events.
The following is the structure of the OpenFunction Context
:
I suggest changing the name from
OpenFunctionContext
toContext
.
type Context struct {
Name string `json:"name"`
Version string `json:"version"`
RequestID string `json:"requestID,omitempty"`
Ctx context.Context `json:"ctx,omitempty"`
Inputs map[string]*Input `json:"inputs,omitempty"`
Outputs map[string]*Output `json:"outputs,omitempty"`
Runtime Runtime `json:"runtime"`
Port string `json:"port,omitempty"`
State interface{} `json:"state,omitempty"`
EventMeta *EventMetadata `json:"event,omitempty"`
SyncRequestMeta *SyncRequestMetadata `json:"syncRequest,omitempty"`
Plugins []string `json:"plugins,omitempty"`
PluginState map[string]plugin.Plugin `json:"pluginState,omitempty"`
Out Out `json:"out,omitempty"`
Error error `json:"error,omitempty"`
DaprClientPort string `json:"daprClientPort,omitempty"`
HttpPattern string `json:"httpPattern,omitempty"`
daprClient dapr.Client
}
type EventMetadata struct {
InputName string `json:"inputName,omitempty"`
BindingEvent *common.BindingEvent `json:"bindingEvent,omitempty"`
TopicEvent *common.TopicEvent `json:"topicEvent,omitempty"`
CloudEvent *cloudevents.Event `json:"cloudEventnt,omitempty"`
}
type SyncRequestMetadata struct {
ResponseWriter http.ResponseWriter `json:"responseWriter,omitempty"`
Request *http.Request `json:"request,omitempty"`
}
type Input struct {
Uri string `json:"uri,omitempty"`
Component string `json:"component,omitempty"`
Type ResourceType `json:"type"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type Output struct {
Uri string `json:"uri,omitempty"`
Component string `json:"component,omitempty"`
Type ResourceType `json:"type"`
Metadata map[string]string `json:"metadata,omitempty"`
Operation string `json:"operation,omitempty"`
}
Compared to the existing Context specification, this proposal has the following changes:
- Add
Ctx
to hold the instance of the native standard context library EventMeta
andSyncRequestMeta
, which hold metadata information for event requests and http requestsPlugins
, used to store the names of the plugins to be implemented in pre-phase and post-phase in orderPluginState
is used to store intermediate state data for plugins during function conversionsError
, used to indicate whether the function is in an exception state after execution, a null value means no exceptions
We still need a structure that encapsulates the return value of the function, which will be renamed from RetValue
to Out
.
A useful approach is that passing information (return values) between functions in the form of json via Out.Data
:
type Out struct {
Code ReturnCode `json:"code"`
Data []byte `json:"data,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Error string `json:"error,omitempty"`
}
I suggest adding a "custom return value declaration" feature to the processing of the return part of the function. In the function workflow, it would make sense to make a judgment based on the return value of the function. (We can disregard this suggestion for now, as we may need a higher-level framework to implement it)
There are alternative ways to do this:
- Create a file with the custom struct by using
buildpacks
- As mentioned above we will use
Out.Data
to pass data in json format so that user can declare the names of the returned values inOut.Metadata
and usejsonpath
to get their values.
Refer to this example. The following is the information in
Out.Data
.
{
"firstName": "John",
"lastName" : "doe",
"age" : 26,
"address" : {
"streetAddress": "naist street",
"city" : "Nara",
"postalCode" : "630-0192"
},
"phoneNumbers": [
{
"type" : "iPhone",
"number": "0123-4567-8888"
},
{
"type" : "home",
"number": "0123-4567-8910"
}
]
}
If the user needs to declare a custom return value, he needs to define the following in Out.Metadata
:
Out.Metadata = map[string]string{
"mySecondPhoneType": "$.phoneNumbers[:1].type",
"my-age": "$.age",
}
Context
contains the only key function:
Send()
is used to send data to the specified output destination.
func (ctx *Context) Send(outputName string, data []byte) ([]byte, error) {
if ctx.OutputIsEmpty() {
return nil, errors.New("no output")
}
var err error
var output *Output
var response *dapr.BindingEvent
client := ctx.GetDaprClient()
if v, ok := ctx.Outputs[outputName]; ok {
output = v
} else {
return nil, fmt.Errorf("output %s not found", outputName)
}
switch output.Type {
case OpenFuncTopic:
err = client.PublishEvent(context.Background(), output.Component, output.Uri, data)
case OpenFuncBinding:
in := &dapr.InvokeBindingRequest{
Name: output.Component,
Operation: output.Operation,
Data: data,
Metadata: output.Metadata,
}
response, err = client.InvokeBinding(context.Background(), in)
}
if err != nil {
return nil, err
}
if response != nil {
return response.Data, nil
}
return nil, nil
}
We can also provide a function to help the user get data from a specified input source, depending on whether it is necessary for us to encapsulate the input information.
This is because there is no additional cost to the user (possibly a more native way) to get the raw data from the input source via Context.EventMeta
or Context.SyncRequestMeta
.
Responsible for handling plugins, which provides the following capabilities:
- Register plugins (initialize and provide a plugin instance to
functionsFrameworkImpl
) - Execute pre-run plugins
- Execute post-run plugins
- Store and retrieve intermediate state of plugins
In the Plugin
interface, we need to provide the following metadatas:
- Name() as plugin's name
- Version() as plugin's version
In addition, the plugin needs to provide an interface ExecPreHook()
for executing the pre-phase logic and an interface ExecPostHook()
for executing the post-phase logic.
The Get()
interface is used to provide the ability to access intermediate data of plugin self for other plugins.
type Metadata interface {
Name() string
Version() string
}
type Plugin interface {
Metadata
ExecPreHook(ctx ofctx.Context) error
ExecPostHook(ctx ofctx.Context) error
Get(fieldName string) (interface{}, bool)
}
During the initialisation process, functionsFrameworkImpl
parses the OpenFunction Context
and obtains the list of plugins that will be used in the pre-phase and post-phase from Context
. It then scans and registers the plugins via scanAndRegisterPlugins()
.
We support the use of user-defined plugins, which is achieved by placing the plugin file in the agreed directory.
The registered plugins are stored in the corresponding functionsFrameworkImpl.prePlugins
and functionsFrameworkImpl.postPlugins
for the framework to use in the processPrePlugins()
and processPostPlugins()
phases.
The following is an example of a plugin.
The plugin called "plugin-a" provides data of two custom intermediate states: stateA and stateB.
In the pre-phase, "plugin-a" stores int64 data 1 to stateA and an instance of context.Context to stateB.
In the post-phase, "plugin-a" fetches the value of the data stateC from the plugin named "plugin-b", and adds it to the value of stateA.
package plugin_a
import (
"context"
"fmt"
"github.com/fatih/structs"
"k8s.io/klog/v2"
ofctx "github.com/OpenFunction/functions-framework-go/context"
)
const (
name = "plugin-a"
version = "v1"
)
type PluginA struct {
PluginName string
PluginVersion string
stateA int64
stateB context.Context
}
func NewPluginA() *PluginA {
return &PluginA{
PluginName: name,
PluginVersion: version,
}
}
func (p *PluginA) Name() string {
return p.PluginName
}
func (p *PluginA) Version() string {
return p.PluginVersion
}
func (p *PluginA) ExecPreHook(ctx ofctx.Context) error {
r := preHookLogic(ctx.Ctx)
p.stateA = 1
p.stateB = r
return nil
}
func (p *PluginA) ExecPostHook(ctx ofctx.Context) error {
// Get data from another plugin via Plugin.Get()
plgName := "plugin-b"
keyName := "stateC"
v, ok := ctx.PluginState[plgName].Get(keyName)
if ok {
stateC := v.(int64)
postHookLogic(p.stateA, stateC)
} else {
return fmt.Errorf("failed to get %s from plugin %s", keyName, plgName)
}
return nil
}
func (p *PluginA) Get(fieldName string) (interface{}, bool) {
plgMap := structs.Map(p)
value, ok := plgMap[fieldName]
return value, ok
}
func preHookLogic(ctx context.Context) context.Context {
if ctx != nil {
return ctx
} else {
return context.Background()
}
}
func postHookLogic(numA int64, numB int64) int64 {
sum := numA + numB
klog.Info("the sum is: %d", sum)
return sum
}
In order to make the plugin mechanism more generic, the author of the plugin is required to make a declaration of intermediate state data based on the version of the plugin.
For synchronous functions, we provide two layouts of function signatures that support native requests:
Receive http.ResponseWriter
, *http.Request
and return function's exception status error
func(http.ResponseWriter, *http.Request) error
Receive context.Context
, cloudevents.Event
and return function's exception status error
func(context.Context, cloudevents.Event) error
In addition, if we adopt OpenFunction Context
's improved solution for request metadata information (i.e. using Context.EventMeta
to store raw event request data and Context.SyncRequestMeta
to store raw HTTP request data), then we can stipulate the function signatures as follows:
The second parameter is used to pass the content of the event to the function, which can be:
- The
body
of the HTTP requestdata
in dapr.TopicEventdata
in dapr.bindingEventThe user can choose to read and use the data directly from this parameter, or to fetch the raw data from the
Context.EventMeta
orContext.SyncRequestMeta
. In functions with no input source, the second parameter is a null value (nil in Go).
func(ofctx.Context, []byte) (ofctx.Out, error)
Next I will show how the framework registers functions with a unified function signature in conjunction with the Runtime.
In this case, the raw data of the request (event) will be carried by the
OpenFunction Context
and throughout the framework (including the pre-phase part, the function logic part, and the post-phase part)
Let's start by looking at the interfaces in the Runtime section:
Start()
is used to start the runtime service.RegisterOpenFunction()
is used to register functions that use the OpenFunction Context (enabled in both Knative and Async runtimes)RegisterHTTPFunction()
is used to register functions for handling HTTP requests (only enabled in the Knative runtime)RegisterCloudEventFunction()
is user to register function which handling CloudEvents (only enabled in the Knative runtime)
type Interface interface {
Start(ctx context.Context) error
RegisterHTTPFunction(
ctx ofctx.Context,
processPreHooksFunc func() error,
processPostHooksFunc func() error,
fn func(http.ResponseWriter, *http.Request) error,
) error
RegisterOpenFunction(
ctx ofctx.Context,
processPreHooksFunc func() error,
processPostHooksFunc func() error,
fn func(ofctx.Context, []byte) (ofctx.Out, error),
) error
RegisterCloudEventFunction(
ctx context.Context,
ofContext ofctx.Context,
processPreHooksFunc func() error,
processPostHooksFunc func() error,
fn func(context.Context, cloudevents.Event) error,
) error
}
The following is an implementation of the RegisterOpenFunction()
in the async runtime:
func (r *Runtime) RegisterOpenFunction(
ctx ofctx.Context,
processPreHooksFunc func() error,
processPostHooksFunc func() error,
fn func(ofctx.Context, []byte) (ofctx.Out, error),
) error {
// Register the asynchronous functions (based on the Dapr runtime)
return func(f func(ofctx.Context, []byte) (ofctx.Out, error)) error {
var funcErr error
// Initialize dapr client if it is nil
ofctx.InitDaprClientIfNil(ctx)
// Serving function with inputs
if !ctx.InputsIsEmpty() {
for name, input := range ctx.Inputs {
switch input.Type {
case ofctx.OpenFuncBinding:
input.Uri = input.Component
funcErr = r.handler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
ctx.EventMeta.InputName = name
ctx.EventMeta.BindingEvent = in
if err := processPreHooksFunc(); err != nil {
// Just logging errors
}
ctx.Out, ctx.Error = f(ctx, in.Data)
if err := processPostHooksFunc(); err != nil {
// Just logging errors
}
switch ctx.Out.Code {
case ofctx.Success:
return ctx.Out.Data, nil
case ofctx.InternalError:
return nil, ctx.Out.Error
default:
return nil, nil
}
})
case ofctx.OpenFuncTopic:
sub := &dapr.Subscription{
PubsubName: input.Component,
Topic: input.Uri,
}
funcErr = r.handler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
ctx.EventMeta.InputName = name
ctx.EventMeta.TopicEvent = e
if err := processPreHooksFunc(); err != nil {
// Just logging errors
}
ctx.Out, ctx.Error = f(ctx, convertTopicEventToByte(e.Data))
if err := processPostHooksFunc(); err != nil {
// Just logging errors
}
switch ctx.Out.Code {
case ofctx.Success:
return false, nil
case ofctx.InternalError:
err = ctx.Out.Error
if retry, ok := ctx.Out.Metadata["retry"]; ok {
if strings.EqualFold(retry, "true") {
return true, err
} else if strings.EqualFold(retry, "false") {
return false, err
} else {
return false, err
}
}
return false, err
default:
return false, nil
}
})
default:
return fmt.Errorf("invalid input type: %s", input.Type)
}
if funcErr != nil {
// When the function throws an exception,
// first call client.Close() to close the dapr client,
// then set fwk.ofContext.daprClient to nil
ofctx.DestroyDaprClient(ctx)
klog.Errorf("failed to add dapr service handler: %v\n", funcErr)
return funcErr
}
}
// Serving function without inputs
} else {
if err := processPreHooksFunc(); err != nil {
// Just logging errors
}
ctx.Out, ctx.Error = f(ctx, nil)
if err := processPostHooksFunc(); err != nil {
// Just logging errors
}
switch ctx.Out.Code {
case ofctx.Success:
return nil
case ofctx.InternalError:
return ctx.Out.Error
default:
return nil
}
}
return nil
}(fn)
}
func convertTopicEventToByte(data interface{}) []byte {
if d, ok := data.([]byte); ok {
return d
}
if d, err := json.Marshal(data); err != nil {
return nil
} else {
return d
}
}
And the implementation in the Knative runtime:
func (r *Runtime) RegisterOpenFunction(
ctx ofctx.Context,
processPreHooksFunc func() error,
processPostHooksFunc func() error,
fn func(ofctx.Context, []byte) (ofctx.Out, error),
) error {
// Register the synchronous function (based on Knaitve runtime)
return func(f func(ofctx.Context, []byte) (ofctx.Out, error)) error {
r.handler.HandleFunc(r.pattern, func(w http.ResponseWriter, r *http.Request) {
ctx.SyncRequestMeta.ResponseWriter = w
ctx.SyncRequestMeta.Request = r
defer RecoverPanicHTTP(w, "Function panic")
if err := processPreHooksFunc(); err != nil {
// Just logging errors
}
ctx.Out, ctx.Error = f(ctx, convertRequestBodyToByte(r))
if err := processPostHooksFunc; err != nil {
// Just logging errors
}
switch ctx.Out.Code {
case ofctx.Success:
w.Header().Set(functionStatusHeader, successStatus)
return
case ofctx.InternalError:
w.Header().Set(functionStatusHeader, errorStatus)
w.WriteHeader(int(ctx.Out.Code))
return
default:
return
}
})
return nil
}(fn)
}
func convertRequestBodyToByte(r *http.Request) []byte {
body, err := io.ReadAll(r.Body)
if err != nil {
return nil
}
return body
}
- Implement functions-frameworks in Go (as known as functions-framework-go)
- Write buildpacks that handle registering custom plugins
- Design and build the functions-framework ci\cd pipeline and complete the co-validation of the functions-framework with the builder
- Integration the SkyAPM/Go2Sky plugin
- Implement functions-frameworks in Nodejs (functions-framework-nodejs) and Python (functions-framework-python)