Functions

The platform's Functions let you run code in the platform, in response to certain events or triggers, without needing any client-side script.

Functions are useful when you want the platform to behave in a specific way in response to internal or external events, or when you want to add your own code to be executed on demand (typically by yourself), in the same way Macros work for MS Excel, for instance.

With the Platform's Functions, you just upload your code and add the triggers that you would like it to execute. Users are able to define data container-level Functions that are executed on certain events:

  • CRUD Functions for files and metadata: When files are created, read, updated or deleted.

    E.g. every time you upload one of your astrophysics-related files to a certain data container:

    • E.g.: Every time you upload one of your photographs:

      • Extract the date from the file name to the metadata catalogue, so it is searchable using the web interface or the API,

      • Calculate each file integrity and

      • Tag images that contain faces

  • Periodic functions: Every minute, hour and day.

    E.g.: Webhooks to other systems

  • Executed-by-demand functions: When the user selects files using the GUI or launched using the API.

    E.g.: For fetch.txt files containing a manifest of files to download, make the platform to download them and place them inside the container.

How to build a Function?

When building a platform function, two aspects are usually defined:

  • The interface or triggers that the function will consider: When the function is going to be presented to the user, or how the platform will execute it.

  • The code that the function will execute

Let's build a sample function step by step:

Declaring the function in the interface

1. Go to Configuration and then to Functions:

2. Select Add function:

3. Give your function a meaningful name and description (both elements will be shown in the interface to the users, so make it sure you keep it short and concise). Enable it.

4. Go to the triggers section and select the events that you would like to execute the function.

You have three main types of triggers:

  • On demand: The function is executed when the user selects "RUN" in the interface (or when the function is called using the API):

  • Automatically: On item (folder/files) create, delete, metadata change or storage policy change: The platform will automatically execute the function when, for instance, a new file is created. This is useful to automate certain actions.

  • Periodically: The platform will execute the function every minute, day, week, etc, as defined in a cron string.

Select the type of trigger carefully following the instructions in the platform interface.

It is possible to select more than one trigger for the same function (when the function is executed, it receives the trigger type that has activated it, so you will be able to manage in your code how to proceed on each case).

Be extra careful when selected triggers like "On item create" as, if you don't properly adjust it, it can generate huge workloads and problems. If you are in doubt, ask LIBNOVA.

5. Go to the parameters section and define the parameters that the function will accept. These parameters are only for on-demand functions, in which the user will be requested to provide certain parameters when calling the function.

You have multiple types of parameters to choose from:

6. Add your code.

7. Select Create

How to code a Function?

Functions workflow

When you create a Function, the platform will precompile it, package it in a Docker container and deploy one or more instances (depending on the parameter Replicas in the Configuration tab).

This means that your functions are always running and ready to accept your workload. There is no warm-up when launching them.

When an execution request is received, the platform creates a Job and sends a message to a queue for execution. Any of the instances/replicas that are running will catch the request and start processing it. One instance/replica is capable of executing only one request at a given time. When it finishes, it starts working in the next one.

Function build/deploy cycle

Under the Code tab, you can create and edit your function. When you have added your code, simply select Save. This will initiate the compilation of the function, that usually takes a few minutes.

To see what is happening, you can select See function build log in the right top corner:

If everything goes fine, you will see a green Ready message:

This means that your function has been compiled, and that the number of Replicas specified are running.

If your code contains errors, the platform will set it in error. You can scroll down in the message to have a hint on the problem:

Function's code structure

A python library to make using the API easier and more convenient is available: library and its documentation. This library can be used in Functions, Jupyter Notebooks and in your own scripts.

Let's create a sample function.

First, you should initialize your function, initializing your components and importing any required dependency. Second, the handler function, that will contain a try block:

#!/usr/bin/env python
# coding: utf-8

# General imports
import os, time, traceback
import json
import tempfile
import os.path as opath
import datetime
import hashlib
import urllib

# Platform functions
from libnova                         import common, Util
from libnova.common                  import nuclio
from libnova.common.nuclio           import Request
from libnova.common.api              import Driver, Container, File, Job, JobMessage
from libnova.common.filesystem       import S3
from libnova.common.filesystem.S3    import File as S3File, Storage, SeekableStream

from datetime import datetime

request_helper   = None

def handler(context, event):
    global request_helper

    request_helper   = None
    # ALWAYS include your code in a try block to gracefully fail. If not, your function will be in a loop
    # and unintented things can happen.
    try:


        #       m     m  mmmm  m    m mmmmm           mmm   mmmm  mmmm   mmmmmm
        #        "m m"  m"  "m #    # #   "#        m"   " m"  "m #   "m #
        #         "#"   #    # #    # #mmmm"        #      #    # #    # #mmmmm
        #          #    #    # #    # #   "m        #      #    # #    # #
        #          #     #mm#  "mmmm" #    "         "mmm"  #mm#  #mmm"  #mmmmm

        #                      m    m mmmmmm mmmmm  mmmmmm
        #                      #    # #      #   "# #
        #                      #mmmm# #mmmmm #mmmm" #mmmmm
        #                      #    # #      #   "m #
        #                      #    # #mmmmm #    " #mmmmm

        # At the end, you should tell the platform that you have finished fine (True) or failed (False)
        request_helper.job_end(True)

    except Exception as e:
        if request_helper is not None:
            request_helper.log(
                "An unhandled exception has occured: \n" +
                repr(e) + "\n" +
                traceback.format_exc(),
                JobMessage.JobMessageType.ERROR
            )
            request_helper.job_end(False)

When the function is called, the handler method is the first that is called:

def handler(context, event):

and under it, you have your logic.

It is important that you enclose all your logic in a try block. If you don't do so, unexpected things will happen when you have errors in your code (the function will restart every time you build it, re-using the same job id, which will create repeated entries, etc. In other words, a mess you will usually want to avoid). Always remember to add it.

Sample function source code

Here you have the full code of a working function that demonstrates the use of the functionalities. We have added many comments explaining every step:

#!/usr/bin/env python
# coding: utf-8

# General imports
import os, time, traceback
import json
import tempfile
import os.path as opath
import datetime
import hashlib
import urllib

# Platform functions
from libnova                         import common, Util
from libnova.common                  import nuclio
from libnova.common.nuclio           import Request
from libnova.common.api              import Driver, Container, File, Job, JobMessage
from libnova.common.filesystem       import S3
from libnova.common.filesystem.S3    import File as S3File, Storage, SeekableStream

from datetime import datetime

request_helper   = None

def handler(context, event):
    global request_helper

    request_helper   = None
    # ALWAYS include your code in a try block to gracefully fail. If not, your function will be in a loop
    # and unintented things can happen.
    try:
        # Initialize the Request parser
        #
        # This will automatically parse the data sent by the platform to this function, like the File ID,
        # the Job ID, or the User ID who triggered this function, etc.
        #
        # It will also initialize the API Driver using the user API Key
        request_helper   = common.nuclio.Request.Request(context, event)
        request_helper.job_init()
        
        # You can log to the function output (visible from the web interface)
        request_helper.log("Hello world!", JobMessage.JobMessageType.INFO)
        
        # There are diffent types of messages you can write. You may need to write an error message!
        request_helper.log("This is an error message :)", JobMessage.JobMessageType.ERROR)
        
        
        # The platform is going to provide you the context in which your Function has been called
        
        log_message = ("Function's context: \n" +
            "> User that is launching the function              : " + request_helper.User.id + "\n" +
            "> Container in which the function has been launched: " + request_helper.Container.id + "\n" +
            "> Trigger that is triggering this function         : " + request_helper.Trigger.id + "\n" +
            "> Trigger type                                     : " +  request_helper.Trigger.type + "\n" +
            "> Job associated to the execution of this function : " + request_helper.Job.id + "\n" +
            "> File/folder selected when launching              : ")
            
        for request_file in request_helper.Files:
            log_message = log_message + request_file.id + " "
        log_message = log_message + " \n"
            
        # You can also make your Function capable of receiving parameters from the user when launching
        # your function. You need to declare them in the "Parameters" tab and then you can retrieve them using 
        # their key:

        try:
            # If parameters are not defined by the user, an exception will be generated when trying to access them.
            # Include them in a try block to gracefully handle this scenario.
            log_message = (log_message + 
            "> Parameter 1 selected by the user                : " + str(request_helper.Parameters["params"]['parameter_1']) + " \n" + 
            "> Parameter 2 selected by the user                : " + str(request_helper.Parameters["params"]['parameter_2']) + " \n" +
            "> Parameter 3 selected by the user                : " + str(request_helper.Parameters["params"]['parameter_3']) + " \n")
            
        except:
            log_message = (log_message + 
            "> Parameters selected by the user                 : NOT PROPERLY DEFINED." + " \n")
            
        finally:
            request_helper.log(log_message, JobMessage.JobMessageType.INFO)
        

        # If you want to get the raw structure that the platform is sending to your function,
        # you can enable the following line. Know that you will be revealing root/admin passwords
        # in the log everyone with access to your container will be able to see. Don't use it 
        # in production!
        
        #request_helper.log(str(event.body.decode("utf-8")), JobMessage.JobMessageType.INFO)
        
        
        # You have a (very small) disk space for your temp files. Be careful, as it can be erased at anytime.
        #temp_file_path = opath.join(opath.join(tempfile.gettempdir(), "temp_assets"), "my_temp_file.txt")
        
        temp_file = tempfile.NamedTemporaryFile()
        with open(temp_file.name, 'w') as f:
            f.write("This is a line of text") 

        
        # You can the upload files to your container
        # First get the S3 bucket your container is in.
        bucket = request_helper.Storage.extra_data["bucket"]
        
        # Next, get the container id that has been used to call your function
        container_id = request_helper.Container.id
        
        # And finally, upload your file
        S3File.upload_file(bucket, container_id, "/my_temp_file.txt", temp_file.name)
        
        # In the case that your function need to return files or reports to the user, 
        # it is sometimes useful to create a shortcut to one or more files, so the user can see
        # them as part of the execution log. You can do it this way:
        # First, as assets are referenced by their file id, we need to obtain it. The platform may 
        # need a few minutes to index it, so we should wait to obtain it.
        # If after 5 minutes the file is not yet indexed, we'll log it and continue without attaching it, to not to 
        # enter in an infinite loop if the platform is really busy.
        
        asset_file_id = None
        start_time = datetime.now()
        
        while asset_file_id is None :
            asset_file_id = File.get_by_path(container_id, "/my_temp_file.txt")
            time.sleep(1)
            time_delta = datetime.now() - start_time
            if time_delta.total_seconds() >= 300:
                request_helper.log("Asset file has been created and uploaded to the container, but not attached to the function result", 
                JobMessage.JobMessageType.ERROR)
                break
        
        # If everything went fine, you can associate it.
        if asset_file_id is not None:
            request_helper.job_asset_add(asset_file_id)

        
        # You can also create a summary of your process, that is shown in the header of the function
        # for the user to get a summary of the function execution.
        request_helper.log(
            "And this is a summary of your execution: \n" +
            " [*] Process 1:    " + u'\u2705' + "COMPLETED" + "\n" +
            " [*] Process 2:    " + u'\u2705' + "COMPLETED" + "\n" +
            " [*] Process 3:    " + u'\u274C' + "FAILED",
            message_type=JobMessage.JobMessageType.SUMMARY
        )

        # At the end, you should tell the platform that you have finished fine (True) or failed (False)
        request_helper.job_end(True)

    except Exception as e:
        if request_helper is not None:
            request_helper.log(
                "An unhandled exception has occured: \n" +
                repr(e) + "\n" +
                traceback.format_exc(),
                JobMessage.JobMessageType.ERROR
            )
            request_helper.job_end(False)
py

The code you have in lines 42, 45 and 50 produces the following output in the function log:

The code you have in lines 136 to 140, produces the following output in the function log:

When something goes wrong in your code, the exception handling in line 146 will start, producing this output in the function log, that will provide a hing on the cause:

If you want to test this function, simply create a new function, paste your code, provide a name and create the following parameters:

Save it, and you will see it appearing in your container's side bar:

Launching a function using the API

When a function trigger is ON_DEMAND, you can also launch it programatically. For instance, if you would like to launch the previous function, you first need the function id. To get the functions and their ids, you can use:

 curl -s  --request GET  --url "$your_labdrive_url/api/functions/v2" \
         --header "Content-Type: application/json" \
         --header "authorization: Bearer $your_labdrive_api_key"

This will present you the list of available functions:

Then, if you want to launch it over a given file (check Search using the API to get the file id of the file you want to execute the function over) you should use:

curl --request POST
--url "$your_labdrive_url/api/container/7/function/v2/3"
--header "Content-Type: application/json"
--header "authorization: Bearer $your_labdrive_api_key" -s
--data '{ 
    "files": { "ids": [ 22343 ] }, 
    "params": { 
        "parameter_1": "This is one parameter", 
        "parameter_2": "22343", 
        "parameter_3": "Option one"} 
        }'

It will return the job id the function has been assigned to:

And finally, you can programatically check the status of a job using:

curl  --request GET  --url "$your_labdrive_url/api/job/2961" \
         --header "Content-Type: application/json" \
         --header "authorization: Bearer $your_labdrive_api_key"

It is also possible to retrieve the function log using:

curl  --request GET  --url "$your_labdrive_url/api/job/2961/messages" \
         --header "Content-Type: application/json" \
         --header "authorization: Bearer $your_labdrive_api_key"bas

And its assets:

curl --request GET --url "$your_labdrive_url/api/job/2961/assets"
     --header "Content-Type: application/json"
     --header "authorization: Bearer $your_labdrive_api_key"

Last updated