Elasticsearch Development

From Archivematica
Revision as of 12:24, 15 February 2019 by Jraddaoui (talk | contribs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

From Archivematica 0.9, AIP package information, such as METS data, is indexed using Elasticsearch (ES). This data can be searched from the Archival Storage area of the dashboard or can be interfaced with programmatically. For Elasticsearch administration information, such as how to delete an Elasticsearch index, please reference the administrator manual.

NB. From Archivematica 1.7.0, users are given the option of whether to index information using Elasticsearch, and so the information below might not work. It will be dependant on how your Archivematica instance has been configured.

NB. In Archivematica 1.9.0, the Elasticsearch version support has been upgraded from ES 1.x to the 6.x version. Check this page if you're running that Archivematica version or higher.

Programmatic Access to indexed AIP data

To access indexed AIP data using a custom script or application, find an Elasticsearch API (Application Programming Interface) library for the programming language you are most comfortable with. In Archivematica we use Python with the Elasticsearch supported library. In our developer documentation, we will demonstrate how to use this and Python to access AIP data, but any programming language, such as PHP and Elastica, should work.

A list of officially supported and community supported libraries can be found on the Elasticsearch website.

Connecting to Elasticsearch (Archivematica 1.3+)

The following example will demonstrate access to the indexes using Elasticsearch's own Python library.

Importing the Elasticsearch API module

The first step is to import the Elasticsearch module and connect to the Elasticsearch server.

from __future__ import print_function
import sys

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError, NotFoundError

conn = Elasticsearch(['127.0.0.1:9200'])

NB. The additional module imports are used in this example so that the examples below can be copied-and-pasted as desired.

Full text searching

Once connected to Elasticsearch, you can perform searches. Below is the code needed to do a wildcard ('*') search for all indexed AIP files. We retrieve the first 20 items. Instead of providing a wildcard you could also supply keywords, such as a specific AIP UUID.

start_page = 1
items_per_page = 20

# ref: string query, https://git.io/vhgUw
wildcard_query =   { "query": {
   "query_string" : {
   "query": "*",
   },
}}

try:
   results = conn.search(
      body=wildcard_query,
      index="aips",
      doc_type="aip",
      from_=start_page - 1,
      size=items_per_page,
   )
except RequestError:
   print("Query error")
   sys.exit()
except NotFoundError:
   print("No results found")
   sys.exit()

There are a number of ways to construct Elasticsearch queries. The Elasticsearch website provides useful reference material: Elasticsearch Full Text Queries.

Querying for specific data

While the string query-type is good for broad searches, you may want to narrow a search down to a specific field of data to reduce false positives. Below is an example of searching documents, using a "term" query to match criteria within specific data.

start_page = 1
items_per_page = 20

# ref: term query, https://git.io/vhrI9
term_query = {
    "query": {
        "constant_score": {
            "filter": {
                "term": {
                    "mets.ns0:mets_dict_list.ns0:amdSec_dict_list.@ID":
                    "amdsec_8"
                }
            }
        }
    }
}

try:
    results = conn.search(
        body=term_query,
        index="aips",
        doc_type="aip",
        from_=start_page - 1,
        size=items_per_page,
    )
except RequestError:
    print("Query error")
    sys.exit()
except NotFoundError:
    print("No results found")
    sys.exit()

Note that the construction of the query is not straightforward. Fields and values are stored in Elasticsearch in lowercase. Properties work in uppercase and might not work in lowercase. You can analyze a query with a `curl` statement along the lines of:

curl -X GET "http://127.0.0.1:9200/_analyze?pretty=true" \
-H 'Content-Type: application/json' -d'
{
  "field": "METS.ns0:mets_dict_list.ns0:amdSec_dict_list.@ID",
  "text": "amdSec_8"
}'

The result is an analysis of the query string we want to use and it can reveal common mistakes in our intuition, for example, searching for `amdSec_8` as a mixed case string.

Displaying search results

Now that you have performed a couple of searches, you can display some results. The logic below cycles through each hit in a results set. For each AIP file, the UUID and filepath of the AIP are printed to the console.

res = results.get("hits")
if res is not None:
    for r_ in res.items():
        if r_[0] == "total":
            print("Total results:", r_[1])
        if r_[0] == "hits":
            print("Results returned:", len(r_[1]))
            for aip_index in r_[1]:
                # aip_index will be the complete AIP record as a Python dict
                if aip_index.get("_source"):
                    print(
                        "AIP ID: {}".format(
                            aip_index.get("_source").get("filePath")
                        )
                    )
                    print(
                        "Filepath: {}".format(
                            aip_index.get("_source").get("uuid")
                        )
                    )

Fetching specific documents

The AIP index inside Archivematica is separated into two Elasticsearch document types. The AIP as a whole, and its individual files.

  • aip
  • aipfile

It might be easier to retrieve information about a specific digital object if you know its UUID and want to query the ```aipfile``` instead. The example below shows how we might retrieve the format identification for the file with the UUID: f7428196-a11b-4093-b311-d43d607d54ca

from __future__ import print_function
import sys

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError, NotFoundError

conn = Elasticsearch(["http://127.0.0.1:9200"])

start_page = 1
items_per_page = 20

term_query = {
    "query": {
        "constant_score": {
            "filter": {
                "term": {"FILEUUID": "f7428196-a11b-4093-b311-d43d607d54ca"}
            }
        }
    }
}

try:
    results = conn.search(
        body=term_query,
        index="aips",
        doc_type="aipfile",
        from_=start_page - 1,
        size=items_per_page,
    )
except RequestError:
    print("Query error")
    sys.exit()
except NotFoundError:
    print("No results found")
    sys.exit()

res = results.get("hits")
if res is not None:
    for res_ in res.get("hits"):
        file_record = res_.get("_source")
        if file_record:
            try:
                puid = file_record["METS"]["amdSec"] \
                ["ns0:amdSec_dict_list"][0]  \
                ["ns0:techMD_dict_list"][0]  \
                ["ns0:mdWrap_dict_list"][0]  \
                ["ns0:xmlData_dict_list"][0] \
                ["ns1:object_dict_list"][0]  \
                ["ns1:objectCharacteristics_dict_list"][0] \
                ["ns1:format_dict_list"][0]  \
                ["ns1:formatRegistry_dict_list"][0] \
                ["ns1:formatRegistryKey"]
                print("Format ID: {}".format(puid))
            except KeyError:
                print("Problem accessing index.")
                sys.exit(1)

Archivematica Transfer Index

Information about Transfers is also indexed in Archivematica. The information is less-rich when compared to what is stored in the AIP and so it is not covered in detail here. We can use the wildcard query from the AIP examples above to begin to look at what is in the transfer index. The query would look like as follows:

from __future__ import print_function
import sys

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError, NotFoundError

conn = Elasticsearch(["http://127.0.0.1:62002"])

start_page = 1
items_per_page = 20

# ref: string query, https://git.io/vhgUw
wildcard_query =   { "query": {
   "query_string" : {
   "query": "*",
   },
}}


try:
    results = conn.search(
        body=wildcard_query,
        index="transfers",
        doc_type="transfer",
        from_=start_page - 1,
        size=items_per_page,
    )
except RequestError:
    print("Query error")
    sys.exit()
except NotFoundError:
    print("No results found")
    sys.exit()

res = results.get("hits")
if res is not None:
    for res_ in res.get("hits"):
        print(res_.get("_source"))

Further reading

Elasticsearch provides API functions beyond searching. Users who wish to make use of these capabilities in Python can look at the Python library documentation.

The complete Elasticsearch documentation reference is also available. Many of the commands are described with examples that can be run using the curl command line tool.

Connecting to Elasticsearch (Archivematica 0.9 up to Archivematica 1.2)

Here we will run through an example of interfacing with older versions of Archivematica using Elasticsearch with a Python script that leverages the pyes library.

NB. Pyes use was removed in Archivematica 1.3. Though with some modification to the examples below it should still be possible to adopt it to query the ES indexes.

Importing the pyes module

The first step, when using pyes, is to require the module. The following code imports pyes functionality on a system on which Archivematica is installed.

import sys
sys.path.append("/home/demo/archivematica/src/archivematicaCommon/lib/externals")
from pyes import *

Next you'll want to create a connection to Elasticsearch.

conn = ES('127.0.0.1:9200')

Full text searching

Once connected to Elasticsearch, you can perform searches. Below is the code needed to do a "wildcard" search for all AIP files indexed by Elasticsearch and retrieve the first 20 items. Instead of doing a "wildcard" search you could also supply keywords, such as a certain AIP UUID.

start_page     = 1
items_per_page = 20

q = StringQuery('*')

try:
    results = conn.search_raw(
        query=q,
        indices='aips',
        type='aip',
        start=start_page - 1,
        size=items_per_page
     )
except:
    print 'Query error.'

Querying for specific data

While the "StringQuery" query type is good for broad searches, you may want to narrow a search down to a specific field of data to reduce false-positives. Below is an example of searching documents, using "TermQuery", matching criteria within specific data. As, by default, Elasticsearch stores term values in lowercase the term value searched for must also be lowercase.

import sys
sys.path.append("/usr/lib/archivematica/archivematicaCommon/externals")
import pyes

conn = pyes.ES('127.0.0.1:9200')

q = pyes.TermQuery("METS.amdSec.ns0:amdSec_list.@ID", "amdsec_8")

try:
    results = conn.search_raw(query=q, indices='aips')
except:
  print 'Query failed.'

Displaying search results

Now that you've performed a couple of searches, you can display some results. The below logic cycles through each hit in a results set, representing an AIP file, and prints the UUID of the AIP the file belongs in, the Elasticsearch document ID corresponding to the indexed file data, and the path of the file within the AIP.

if results:
    document_ids = []
    for item in results.hits.hits:
        aip = item._source
        print 'AIP ID: ' + aip['AIPUUID'] + ' / Document ID: ' + item._id
        print 'Filepath: ' + aip['filePath']
        print
        document_ids.append(item._id)

Fetching specific documents

If you want to get Elasticsearch data for a specific AIP file, you can use the Elasticsearch document ID. The above code populates the document_ids array and the below code uses this data, retrieving individual documents and extracting a specific item of data from each document.

for document_id in document_ids:
    data = conn.get(index_name, type_name, document_id)

    format = data['METS']['amdSec'] \
    ['ns0:amdSec_list'][0]  \
    ['ns0:techMD_list'][0]  \
    ['ns0:mdWrap_list'][0]  \
    ['ns0:xmlData_list'][0] \
    ['ns1:object_list'][0]  \
    ['ns1:objectCharacteristics_list'][0] \
    ['ns1:format_list'][0]  \
    ['ns1:formatDesignation_list'][0] \
    ['ns1:formatName']

    print 'Format for document ID ' + document_id + ' is ' + format

Augmenting documents

To add additional data to an Elasticsearch document, you will need the document ID. The following code shows an Elasticsearch query being used to find a document and update it with additional data. Note that the name of the data field being added, "__public", is prefixed with two underscores. This practice prevents the accidental overwriting of system or Archivematica-specific data. System data is prefixed with a single underscore.

import sys
sys.path.append("/usr/lib/archivematica/archivematicaCommon/externals")
import pyes

conn = pyes.ES('127.0.0.1:9200')

q = pyes.TermQuery("METS.amdSec.ns0:amdSec_list.@ID", "amdsec_8")

results = conn.search_raw(query=q, indices='aips')

try:
  if results:
    for item in results.hits.hits:
        print 'Updating ID: ' + item['_id']

        document = item['_source']
        document['__public'] = 'yes'
        conn.index(document, 'aips', 'aip', item['_id'])
except:
  print 'Query failed.'