Functions

Flexible Intake Functions let you run code in the platform, in response to certain events or triggers, without needing any client-side script.

Functions are useful when you want the platform to behave in a specific way in response to internal or external events, or when you want to add your own code to be executed on demand (typically by yourself).

With the Flexible Intake Functions, you just upload your code and add the triggers that you would like it to execute. Users are able to define data container-level lambda functions (platform Lambda functions) that are executed on certain events:

  • CRUD Functions for files and metadata: When files are created, read, updated or deleted.

    E.g. every time you upload one of your astrophysics-related files to a certain data container:

    • E.g.: Every time you upload one of your photographs:

      • Extract the date from the file name to the metadata catalogue, so it is searchable using the web interface or the API,

      • Calculate each file integrity and

      • Tag images that contain faces

  • Periodic functions: Every minute, hour and day.

    E.g.: Webhooks to other systems

  • Executed-by-demand functions: When the user selects files using the GUI or launched using the API.

    E.g.: For fetch.txt files containing a manifest of files to download, make Flexible Intake to download them and place them inside the container.

A python library to make using the API easier and more convenient is available: library and its documentation. This library can be used in Functions, Jupyter Notebooks and in your own scripts.

How to create a Flexible Intake Function

  1. Go to Configuration and then to Functions:

  1. Select Add function:

  1. Give your function a meaningful name and description (both elements will be shown in the interface to the users, so make it sure you keep it short and concise). Enable it.

  2. In the Code tab, you can add the code you would like the platform to execute. Check the Functions code gallery to find examples. Introduce a "handler" that will be the unique short name for your function.

  3. Go to the triggers section and select the events that you would like to execute the function.

  4. Go to the parameters section and define the parameters that the function will accept. These parameters are only for on-demand functions, in which the user will be requested to provide certain parameters when calling the function.

  5. Select Create

Function's code

We have created a Python library that simplifies many actions and makes your programming easier when creating a function.

A python library to make using the API easier and more convenient is available: library and its documentation. This library can be used in Functions, Jupyter Notebooks and in your own scripts.

For example, lets say you would like to create a function that hashes your files with a new algorithm you would like to use.

First, you should initialize your function, loading the LIBNOVA Flexible Intake libraries:

#!/usr/bin/env python
# coding: utf-8

import json
import hashlib

from libnova                           import com, Util
from libnova.com                       import Nuclio
from libnova.com.Nuclio                import Request
from libnova.com.Api                   import Driver, Container, File, Job, JobMessage
from libnova.com.Filesystem            import S3
from libnova.com.Filesystem.S3         import File as S3File, Storage

If your code is going to be called from a Flexible Intake Function, you will receive some parameters from Flexible Intake every time your function is called. This variable is initialized in the following way:

request_helper = com.Nuclio.Request.Request(context, event)

Depending on the function type, the structure you receive can change, but usually you can find the following:

{
    "api": {
        "url": "http://go.libnova.com",
        "key_user": "1234567890abcdefghijklmnopqrstuvwxyz",
        "key_root": "1234567890abcdefghijklmnopqrstuvwxyz"
    },
    "function_data": {
        "container": {
            "id": "1"
        },
        "user": {
            "id": "1"
        },
        "files": {
            "ids":   [ ],
            "paths": [ ]
        },
        "job": {
            "id": "299"
        },
        "trigger": {
            "id": "0",
            "type": "",
            "regex": ""
        },
        "function": {
            "id": "0",
            "key": ""
        }
    },
    "function_params": {
        "your_custom_parameter": "custom_parameter_value"
    }
}

Every function executes in relation to an (Execution) Job, that is really useful for logging the execution progress. You should initialize it with:

# This will set the current function Job to the status "RUNNING"
request_helper.job_init()

And you can log to it using:

# This will write a new Job Message related with the current function Job
request_helper.log("Sample message", JobMessage.JobMessageType.INFO)

The JobMessage.JobMessageType defines the type of message. You can see a list of the available types in the method documentation.

And then, you would usually have your payload. In this example:

# This will iterate over all the files related with this function execution
for request_file in request_helper.Files:
    # This will retrieve the current function File metadata
    file_metadata = File.get_metadata(request_file.id, True)
    if file_metadata is not None:
        # We log the metadata
        request_helper.log(Util.format_json_item(file_metadata), JobMessage.JobMessageType.INFO)
    else:
        request_helper.log("File " + request_file.id + " has no metadata", JobMessage.JobMessageType.INFO)

    # This will retrieve a seekable S3 file stream that can be used like a native file stream reader
    file_stream = S3.File.get_stream(
        # The storage is needed to set the source bucket of the file
        request_helper.Storage,
        request_file
    )
    if file_stream is not None:
        file_hash_md5 = hashlib.md5()
        file_hash_sha1 = hashlib.sha1()
        file_hash_sha256 = hashlib.sha256()

        # Hashing the blocks with a stream buffer read we can hash multiple algorithms at once
        file_data_stream_buffer = file_stream.read(8 * 1024 * 1024)
        while file_data_stream_buffer:
            file_hash_md5.update(file_data_stream_buffer)
            file_hash_sha1.update(file_data_stream_buffer)
            file_hash_sha256.update(file_data_stream_buffer)

            file_data_stream_buffer = file_stream.read(8 * 1024 * 1024)

        # We log some messages related to the result of the function
        request_helper.log("File hash calculated: MD5    - " + file_hash_md5.hexdigest(),
                           JobMessage.JobMessageType.INFO, request_file.id)
        request_helper.log("File hash calculated: SHA1   - " + file_hash_sha1.hexdigest(),
                           JobMessage.JobMessageType.INFO, request_file.id)
        request_helper.log("File hash calculated: SHA256 - " + file_hash_sha256.hexdigest(),
                           JobMessage.JobMessageType.INFO, request_file.id)

        # We can also store the calculated hashes in the database
        File.set_hash(request_file.id, "md5", file_hash_md5.hexdigest())
        File.set_hash(request_file.id, "sha1", file_hash_sha1.hexdigest())
        File.set_hash(request_file.id, "sha256", file_hash_sha256.hexdigest())

And finally, we must let Flexible Intake know that our function has finished, with the result status:

# This will finalize the current function Job
# The parameter is a boolean that determines if the function Job was successful or not
#
# If the parameter is True,  the result will be "COMPLETED",
# else,
# If the parameter is False, the result will be "FAILED"
request_helper.job_end(True)

The full code sample:

#!/usr/bin/env python
# coding: utf-8

import json
import hashlib

from libnova                           import com, Util
from libnova.com                       import Nuclio
from libnova.com.Nuclio                import Request
from libnova.com.Api                   import Driver, Container, File, Job, JobMessage
from libnova.com.Filesystem            import S3
from libnova.com.Filesystem.S3         import File as S3File, Storage

# Initialize the Request parser
#
# This will automatically parse the data sent by the platform to this function, like the File ID,
# the Job ID, or the User ID who triggered this function.
#
# It will also initialize the API Driver using the user API Key
context.logger.info(event.body.decode("utf-8"))
request_helper = com.Nuclio.Request.Request(context, event) 

# This will set the current function Job to the status "RUNNING"
request_helper.job_init()

# This will write a new Job Message related with the current function Job
request_helper.log("Sample message", JobMessage.JobMessageType.INFO)

# This will iterate over all the files related with this function execution
for request_file in request_helper.Files:
    # This will retrieve the current function File metadata
    file_metadata = File.get_metadata(request_file.id, True)
    if file_metadata is not None:
        # We log the metadata
        request_helper.log(Util.format_json_item(file_metadata), JobMessage.JobMessageType.INFO)
    else:
        request_helper.log("File " + request_file.id + " has no metadata", JobMessage.JobMessageType.INFO)

    # This will retrieve a seekable S3 file stream that can be used like a native file stream reader
    file_stream = S3.File.get_stream(
        # The storage is needed to set the source bucket of the file
        request_helper.Storage,
        request_file
    )
    if file_stream is not None:
        file_hash_md5 = hashlib.md5()
        file_hash_sha1 = hashlib.sha1()
        file_hash_sha256 = hashlib.sha256()

        # Hashing the blocks with a stream buffer read we can hash multiple algorithms at once
        file_data_stream_buffer = file_stream.read(8 * 1024 * 1024)
        while file_data_stream_buffer:
            file_hash_md5.update(file_data_stream_buffer)
            file_hash_sha1.update(file_data_stream_buffer)
            file_hash_sha256.update(file_data_stream_buffer)

            file_data_stream_buffer = file_stream.read(8 * 1024 * 1024)

        # We log some messages related to the result of the function
        request_helper.log("File hash calculated: MD5    - " + file_hash_md5.hexdigest(),
                           JobMessage.JobMessageType.INFO, request_file.id)
        request_helper.log("File hash calculated: SHA1   - " + file_hash_sha1.hexdigest(),
                           JobMessage.JobMessageType.INFO, request_file.id)
        request_helper.log("File hash calculated: SHA256 - " + file_hash_sha256.hexdigest(),
                           JobMessage.JobMessageType.INFO, request_file.id)

        # We can also store the calculated hashes in the database
        File.set_hash(request_file.id, "md5", file_hash_md5.hexdigest())
        File.set_hash(request_file.id, "sha1", file_hash_sha1.hexdigest())
        File.set_hash(request_file.id, "sha256", file_hash_sha256.hexdigest())

# This will finalize the current function Job
# The parameter is a boolean that determines if the function Job was successful or not
#
# If the parameter is True,  the result will be "COMPLETED",
# else,
# If the parameter is False, the result will be "FAILED"
request_helper.job_end(True)

Last updated