Format for uploading a workflow with custom pipelet step

Hi,

I’m trying to use the new pipeline workflow API to upload a new pipeline workflow that includes a custom pipelet I built. I can’t find any example formats in the docs that describe what the value of type should be when the step is a pipelet, or how to specify the pipelet (is this name, key, or other?). What should this look like?

1 Like

Greetings - to start off, below is an example of a pipelet used in one of our sandbox projects:

Example Pipelet

{
  "id": "2m3m4ms5l2234-JpSA",
  "key": "sandbox_instance/Remove_Duplicates",
  "type": "pipelet",
  "config": { "pipelet": "sandbox_instance/Remove_Duplicates" },
  "config_options": [
    {
      "display_label": "Some Config Option",
      "help": "Sets a config option",
      "name": "some_config_option",
      "required": false,
      "type": "string"
    }
  ],
  "description": "A more detailed description of the pipelet",
  "linked_steps": ["Indexing"],
  "message": null,
  "name": "Remove_Duplicates",
  "precede_steps": [],
  "section": "enrich",
  "succeed_steps": [],
  "summary": "Removes duplicates from the source"
}

‘Type’ and ‘Key’

Before using the new_pipeline_workflow() method, I would like to assume that the pipelet has already been completed and uploaded to the instance. If the user attempts to create a pipeline workflow with a pipelet that is not uploaded, a 404 error will occur. The common method of uploading a pipelet is found here:

Pipelet Upload

The type field will be as follows: 'type': 'pipelet'

To specify which pipelet is used, we utilize the key field. This is taken from the id of the pipelet itself, which is not to be confused the id of the pipelet within the workflow. Put simply, you can find the id of the pipelet using the client’s get_pipelet() method. Please note that it follows the pattern of instance_name/pipelet_name

get_pipelet()

client.get_pipelet('textrazor')
{
  'id': 'tenant01/textrazor',
  'description': 'Entity extraction with `TextRazor`.',
  'name': 'textrazor',
  ...
}

Used in the context of new_workflow_pipeline(), the pipelet’s key will be written like this:
'key': 'tenant01/textrazor'

Minimum Required Info

Below is the minimal amount of properties to successfully include a custom pipelet in the new_workflow_pipeline() method:

Please note that config and config_options must be present, even if empty. Also note that a section must be specified, though this can be changed later. Information on available sections is found here:
Pipeline Sections

{
  "key": "sandbox_instance/Remove_Duplicates",
  "name": "Remove_Duplicates",
  "type": "pipelet",
  "section": "enrich",
  "config": {},
  "config_options": [],
}

We hope you find this information helpful as you continue in your interest using Squirro!
Please reach out if you have any further questions.

Sincerely,
Hubert Shon, Solutions Engineer

Thank you Hubert. Unfortunately I keep getting the same error from the client. Trying here with a dummy example, “Word Count”, which has the following response from my personal Squirro server:

client.get_pipelet('Word Count')
{'id': 'neil-ek/Word Count',
 'name': 'Word Count',
 'summary': '',
 'description': '',
 'description_html': '',
 'config_options': [{'default': '{\n}',
   'display_label': 'Configuration',
   'help': 'Pipelet configuration as a JSON dictionary.',
   'name': 'config',
   'syntax': 'json',
   'type': 'code'}],
 'pipeline_section': None,
 'source': '"""\nget the word count for an body\n"""\n\nfrom squirro.sdk import PipeletV1, require\n\n\n@require("log")\nclass WordCountPipelet(PipeletV1):\n    def __init__(self, config):\n\n        self.config = config\n\n    def consume(self, item):\n        """Main method run by the pipelet"""\n\n        item.setdefault("keywords", {})\n        self._enrich(item)\n\n        return item\n\n    def _enrich(self, item):\n        """count the words in the body"""\n\n        item_body = item.get("body")\n        item_body = item_body.split(" ")\n        item["keywords"]["word_count"] = [len(item_body)]\n',
 'data': None}

I put this in the steps section of my pipeline_workflow.json:

  {
      "key": "neil-ek/Word Count",
      "name": "Word Count",
      "type": "pipelet",
      "section": "enrich",
      "config": {},
      "config_options": [
          {
              "default": "{\n}",
              "display_label": "Configuration",
              "help": "Pipelet configuration as a JSON dictionary.",
              "name": "config",
              "syntax": "json",
              "type": "code"
          }
      ]
  }

I get this error from the client when trying to upload my workflow as “test”:

ClientError: (400, {'msg': "Pipeline workflow 'test' validation failed: Pipelet step 'Word Count' of type '' has error: the property key of a pipelet should be in the form '<tenant_name>/<pipelet_name>', not "}, {'error': {'msg': "Pipeline workflow 'test' validation failed: Pipelet step 'Word Count' of type '' has error: the property key of a pipelet should be in the form '<tenant_name>/<pipelet_name>', not "}})

I can’t figure out why.

Could you try changing the key value to hold single quotes instead of double quotes? I did run into this issue during my own testing and single quotes let me get past this error.

1 Like

Thanks Hubert - where do I put single quotes? I don’t see any quotes anywhere inside the keys or values.

1 Like

hi @nquinn
not sure if relevant, but these are the (adapted to your example above) keys/values I have in a similar pipeline workflow export:

{
    "key": "neil-ek/Word Count",
    "type": "pipelet",
    "name": "Word Count",
    "section": "enrich",
    "config": {
        "config": {},
        "pipelet": "neil-ek/Word Count"
    },
    "summary": "",
    "description": "",
    "config_options": [
        {
            "default": "{\n}",
            "display_label": "Configuration",
            "help": "Pipelet configuration as a JSON dictionary.",
            "name": "config",
            "syntax": "json",
            "type": "code"
        }
    ],
    "precede_steps": [],
    "succeed_steps": [],
    "message": null,
    "linked_steps": [
        "Indexing"
    ]
},

Note the additional “pipelet” key under config.

1 Like

@chiara adding the extra pipelet field in the config object did it! Sorry for the long response time, and thank you for the help.

2 Likes