How to use the job engine for bulk tasks

assembly line workers, van Gogh

The job machine is designed to run many jobs in parallel as fast as possible. In this tutorial we want to calculate the MD hash values for each file in a directory with many files.

The example is a bit longer because many tasks are presented here:

  • The definition of dependencies
  • The service starter
  • The operation of a nano service
  • The outsourcing of the request to a working target, in order that parallel requests can be processed
  • The initialization of the job machine
  • The job itself
  • The orderly termination of the machine in case of success and in case of termination

The service target

To make the exercise more understandable, we register a nanoservice. Creating hashes of the files in a directory is thus possible by sending a message to this nanoservice.

The nanoservice is provided by a target. Depending on the size and number of files, generating the hashes may take some time. Therefore, the target delegates the work to another work target to avoid being blocked.

This is our service target.

class CFileHashCalculator extends CTarget implements IService
{
    private final IDependencies mDependencies;

    CFileHashCalculator(@NotNull final IDependencies aDependencies)
    {
        mDependencies = aDependencies;

        addMessageHandler(CRecordStartTarget.ID,
                          this::asyncStartTarget);
        addMessageHandler(CRecordHashMyDir.ID,
                          this::asyncHashMyDir);
    }

    @Override
    public void activate(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
    {
        // This namespace will always exist
        1
final INamespace namespace = mDependencies.getNamespaceFactory()
                                          .getNamespace(CWellKnownNID.SYSTEM);
assert namespace != null;
namespace.getTargetRegistry()
         .registerTarget(this);
    }

    @Override
    public void deactivate(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
    {
        // Remove nano service CRecordHashMyDir from system namespace.
        // This namespace will always exist.
        // Will be done by system automatically, but is shown here for completeness.
        5
final INamespace system = mDependencies.getNamespaceFactory()
                                       .getNamespace(CWellKnownNID.SYSTEM);
assert system != null;
system.getNanoServiceRegistry()
      .removeObserver(CRecordHashMyDir.class,
                      getAddress());
        deregisterTarget();
    }

    private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope,
                                     @NotNull final CRecord aRecord) throws CException
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            // add nano service CRecordHashMyDir to system namespace
            2
final INamespace system = mDependencies.getNamespaceFactory()
                                       .getNamespace(CWellKnownNID.SYSTEM);
assert system != null;
system.getNanoServiceRegistry()
      .addNanoServiceAndObserver(CRecordHashMyDir.class,
                                 getAddress(),
                                 false);

            aEnvelope.setResultSuccess();
            return true;
        }
    }

    private boolean asyncHashMyDir(@NotNull final CEnvelope aEnvelope,
                                   @NotNull final CRecord aRecord) throws CException
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            3
// get path name of directory from message
final String pathname = CRecordHashMyDir.getPath(aRecord,
                                                 null);
if (pathname == null)
{
    // error: send back
    aEnvelope.setResult(CResultCode.INVALID_ARGUMENT,
                        "path is null");
}
else
{
    final Path path = Paths.get(pathname);
    final File dir = path.toFile();
    if (!dir.exists() || !dir.isDirectory())
    {
        // error: send back
        aEnvelope.setResult(CResultCode.NO_DIRECTORY,
                            "no directory");
    }
    else
    {
        // Each request is processed by an additional working target
        4
final CWorkingTarget tgt = new CWorkingTarget(mDependencies,
                                  aEnvelope,
                                  aRecord);
getTargetRegistry().registerTarget(tgt);
        
    }
}
            return true;
        }
    }
}
  • 1 Registration of the target in the activate() method
  • 2 Registration of the nano service with observer in the namespace SYSTEM
  • 3 Parsing of the CRecordHashMyDir message
  • 4 Creation and registration of the working target
  • 5 Deregistration of the observer (not required)

The working target

The work target is responsible for processing the request. It creates the job engine as well as the threads of the engine. Then it parses the subdirectory and creates a job for each file, which it passes to the engine.

class CWorkingTarget extends CTarget
{
    private static final String NAME = "HashCalculator";
    private final IDependencies mDependencies;
    private final CEnvelope mEnvelope;
    private final CRecord mRecord;

    private final Map<String, String> mResult = new HashMap<>();
    private IJobEngine mEngine;

    CWorkingTarget(@NotNull final IDependencies aDependencies,
                   @NotNull final CEnvelope aEnvelope,
                   @NotNull final CRecord aRecord)
    {
        mDependencies = aDependencies;
        1
mEnvelope = aEnvelope;
mRecord = aRecord;

mEnvelope.setBlocked(true);

        addMessageHandler(CRecordStartTarget.ID,
                          this::asyncStartTarget);
        addMessageHandler(CRecordSetHash.ID,
                          this::asyncSetHash);
        addMessageHandler(CRecordNoMoreJobs.ID,
                          this::asyncNoMoreJobs);
        addMessageHandler(CRecordDismiss.ID,
                          this::asyncDismiss);
    }

    private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope,
                                     @NotNull final CRecord aRecord) throws CException
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            // create job engine
            2
final String path = CRecordHashMyDir.getPath(mRecord,
                                             null);

mEngine = mDependencies.getJobEngineFactory()
                       .createEngine(NAME);
mEngine.setOwner(getAddress());
for (int i = 0; i < 10; i++)
{
    mEngine.appendThread("_" + i,
                         EThreadPriority.LOW,
                         1);
}
mEngine.start();

addJobs(Paths.get(path));

            aEnvelope.setResultSuccess();
            return true;
        }
    }

    private boolean asyncSetHash(@NotNull final CEnvelope aEnvelope,
                                 @NotNull final CRecord aRecord)
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            // from a job target: path and hash for a file
            4
final String path = CRecordSetHash.getPath(aRecord,
                                           null);
final String hash = CRecordSetHash.getHash(aRecord,
                                           null);
if (path != null && hash != null)
{
    mResult.put(path,
                hash);
}
            return true;
        }
    }

    private boolean asyncNoMoreJobs(@NotNull final CEnvelope aEnvelope,
                                    @NotNull final CRecord aRecord) throws CException
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            // finish with success
            5
finish(CResultCode.SUCCESS,
       "");
            return true;
        }
    }

    private boolean asyncDismiss(@NotNull final CEnvelope aEnvelope,
                                 @NotNull final CRecord aRecord) throws CException
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            // Finish with cancel
            6
finish(CResultCode.CANCEL,
       "Dismissed");
            return true;
        }
    }

    private void addJobs(final Path incomingPath)
    {
        final CTargetAddress address = getAddress();
        try
        {
            Files.walkFileTree(incomingPath,
                               new FileVisitor<Path>()
                               {
                                   @Override
                                   public FileVisitResult preVisitDirectory(final Path aDir,
                                                                            final BasicFileAttributes attrs)
                                   {
                                       return FileVisitResult.CONTINUE;
                                   }

                                   @Override
                                   public FileVisitResult visitFile(final Path aPath,
                                                                    final BasicFileAttributes attrs)
                                   {
                                       // Create and add job
                                       3
final IJob job = new CJobHashFile(aPath,
                                 address);
mEngine.appendJob(job);
                                       return FileVisitResult.CONTINUE;
                                   }

                                   @Override
                                   public FileVisitResult visitFileFailed(final Path file,
                                                                          final IOException exc)
                                   {
                                       return FileVisitResult.CONTINUE;
                                   }

                                   @Override
                                   public FileVisitResult postVisitDirectory(final Path dir,
                                                                             final IOException exc)
                                   {
                                       return FileVisitResult.CONTINUE;
                                   }
                               });
        }
        catch (final IOException aE)
        {
            aE.printStackTrace();
        }
    }

    private void finish(final int aCode,
                        final String aText) throws CException
    {
        // dismiss engine
        7
if (mEngine != null)
{
    mEngine.dismiss();
}

        // Enter the data in the request
        8
if (aCode == CResultCode.SUCCESS && mResult != null)
{
    // create two string arrays, the first with path entries, the second with hashes
    final int size = mResult.size();
    final String[] paths = new String[size];
    final String[] hashes = new String[size];
    int index = 0;
    for (final Map.Entry<String, String> entry : mResult.entrySet())
    {
        paths[index] = entry.getKey();
        hashes[index] = entry.getValue();
        index++;
    }

    CRecordHashMyDir.setPathArray(mRecord,
                                  paths);
    CRecordHashMyDir.setMd5Array(mRecord,
                                 hashes);
}

        // send results back to requester
        9
mEnvelope.setResult(aCode,
                    aText);
mEnvelope.setBlocked(false);
getMessageSender().sendBack(mEnvelope,
                            mRecord);

        // The work is finished
        10
deregisterTarget();
    }
}
  • 1 The envelope and the record of the original request are saved. The message is blocked so that it is not automatically returned to the client by the system. We do this manually after the job.
  • 2 The job machine is created. 10 threads are added, each with priority LOW. The machine is started. Then the jobs are added. Attention: The priority of the jobs must match that of the threads.
  • 3 The directory is searched recursively for files. For each regular file found, a job is created and added to the engine. In addition to the path of the file, the address of the working target is also passed to the job so that the job can transmit the hash after it has been calculated.
  • 4 A job has sent us a hash for a file via message.
  • 5 There are no more jobs in process. We finish the job machine and send the result back.
  • 6 We take a CRecordDismiss message as a cause to abort processing.
  • 7 The engine will be terminated.
  • 8 The hash data is packed into the message.
  • 9 The message is returned manually.
  • 10 The working target is terminated.

The job target

The actual job class is derived from the abstract class CAbstractJob. The structure is simple. The work is done in the handler for the start message. Internally the thread is used in which the target was registered by the machine.

class CJobHashFile extends CAbstractJob
{
    private static final ILogger LOG = CLoggerFactory.getLogger(CJobHashFile.class);
    private final Path mPath;
    private final CTargetAddress mParent;

    CJobHashFile(@NotNull final Path aPath,
                 @NotNull final CTargetAddress aParent)
    {

        mPath = aPath;
        mParent = aParent;

        addMessageHandler(CRecordStartTarget.ID,
                          this::asyncStartTarget);
    }

    @Override
    @NotNull
    public String getJobName()
    {
        return mPath.toString();
    }

    @Override
    public @NotNull EThreadPriority getPriority()
    {
        // There must be threads in the engine with the same priority!
        return EThreadPriority.LOW;
    }

    private boolean asyncStartTarget(@NotNull final CEnvelope aEnvelope,
                                     @NotNull final CRecord aRecord) throws CException
    {
        if (aEnvelope.isAnswer())
        {
            return false;
        }
        else
        {
            LOG.info("Create hash for {}",
                     mPath);

            // hash the file
            1
final File file = mPath.toFile();
final CMd5 md5 = CUtilMd5.calculate(file,
                                    null);
if (md5 != null)
{
    // send file hash to working target
    final String hash = md5.toBase64();

                2
final CEnvelope env = CEnvelope.forSingleTarget(mParent);
final CRecord record = CRecordSetHash.create();
CRecordSetHash.setPath(record,
                       mPath.toString());
CRecordSetHash.setHash(record,
                       hash);
sendNotification(env,
                 record);
            }

            3
super.notifyJobStopped();
            aEnvelope.setResultSuccess();
            return true;
        }
    }
}
  • 1 By means of an internal auxiliary class of the kernel the hash value is determined and encoded into a base64 string.
  • 2 Then the calculated value is sent to the working target.
  • 3 The end of the job is signaled. The target is not terminated, because this is done by the machine.

The dependencies interface

As always, the nyssr.net package structure is used with a dependency interface and a service starter. See also Package initialization

interface IDependencies
{
    // Used for registration of the target
    INamespaceFactory getNamespaceFactory();

    // The job engine service factory
    IJobEngineFactory getJobEngineFactory();
}

The service starter

The service starter starts the package. It is created in the module handler of the plugin.

public class CPackageHashCalculator implements IServiceStarter, IDependencies
{
    private IService mService;
    private INamespaceFactory mNamespaceFactory;
    private IJobEngineFactory mJobEngineFactory;

    @Override
    public void getDependencies(@NotNull final IServiceDependencyList aDependencyList)
    {
        1
aDependencyList.add(INamespaceFactory.class);
aDependencyList.add(IJobEngineFactory.class);
    }

    @Override
    public void start(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
    {
        if (mService == null)
        {
            2
mNamespaceFactory = aServiceRegistry.getServiceOrThrow(INamespaceFactory.class);
mJobEngineFactory = aServiceRegistry.getServiceOrThrow(IJobEngineFactory.class);

            3
mService = new CFileHashCalculator(this);
mService.activate(aServiceRegistry);
        }
    }

    @Override
    public void stop(@NotNull final IServiceRegistry aServiceRegistry) throws Exception
    {
        if (mService != null)
        {
            4
mService.deactivate(aServiceRegistry);
mService = null;
        }
    }

    @Override
    public INamespaceFactory getNamespaceFactory()
    {
        return mNamespaceFactory;
    }

    @Override
    public IJobEngineFactory getJobEngineFactory()
    {
        return mJobEngineFactory;
    }
}
  • 1 Registration of dependencies.
  • 2 Fetch dependencies.
  • 3 Start of the service.
  • 4 Terminate the service.

The messages

CRecordHashMyDir

This is the nanoservice sent to the service by a requester.

{
  "id": "907df7c7-c435-45ed-a0c4-09ff0f7f6102",
  "name": "HASH_MY_DIR",
  "isService": "true",
  "namespaces": "SYSTEM",
  "description": "Create MD5 hashes for all files in a directory",
  "slots": [
    {
      "key": "path",
      "name": "PATH",
      "direction": "REQUEST",
      "mandatory": "true",
      "type": "STRING",
      "description": "The path of the directory to hash."
    },
    {
      "key": "path-arr",
      "name": "PATH_ARRAY",
      "direction": "ANSWER",
      "mandatory": "false",
      "type": "STRING_ARRAY",
      "description": "The paths of all files."
    },
    {
      "key": "md5-arr",
      "name": "MD5_ARRAY",
      "direction": "ANSWER",
      "mandatory": "false",
      "type": "STRING_ARRAY",
      "description": "The hashes of all files base64 encoded."
    }
  ]
}

CRecordSetHash

This message is sent by each job to the working target after finishing its work.

{
  "id": "67f918bb-673a-409c-a09a-519fcd47c33b",
  "name": "SET_HASH",
  "isService": "false",
  "namespaces": "",
  "description": "Set the base64 encoded hash value for a file.",
  "slots": [
    {
      "key": "1",
      "name": "PATH",
      "direction": "REQUEST",
      "mandatory": "true",
      "type": "STRING",
      "description": "The path of the file."
    },
    {
      "key": "2",
      "name": "HASH",
      "direction": "REQUEST",
      "mandatory": "true",
      "type": "STRING",
      "description": "The base64 encoded hash value of the file."
    }
  ]
}

See also

nyssr.net - Innovative Distributed System