JPPF User Guide
User Manual:
Open the PDF directly: View PDF .Page Count: 367
JPPF Manual
Table of Contents
1 Introduction......................................................................4
1.1 Intended audience......................................................4
1.2 Prerequisites...............................................................4
1.3 Where to download.....................................................4
1.4 Installation...................................................................4
1.5 Running the standalone modules................................4
2 JPPF Overview................................................................5
2.1 Architecture and topology...........................................5
2.2 Work distribution.........................................................6
2.3 Jobs and tasks granularity..........................................7
2.4 Networking considerations..........................................8
2.5 Sources of parallelism...............................................10
3 Tutorial : A first taste of JPPF.........................................11
3.1 Required software.....................................................11
3.2 Overview...................................................................11
3.3 Writing a JPPF task..................................................12
3.4 Creating and executing a job....................................13
3.5 Running the application............................................15
3.6 Dynamic deployment................................................16
3.7 Job Management......................................................17
3.8 Conclusion................................................................20
4 Development Guide.......................................................21
4.1 Task objects..............................................................21
4.2 Dealing with jobs.......................................................34
4.3 Jobs runtime behavior, recovery and failover............38
4.4 Sharing data among tasks : the DataProvider API....41
4.5 Job Service Level Agreement...................................43
4.6 Job Metadata............................................................53
4.7 Execution policies.....................................................54
4.8 The JPPFClient API..................................................61
4.9 Connection pools......................................................64
4.10 Notifications of client job queue events...................71
4.11 Submitting multiple jobs concurrently......................72
4.12 Jobs persistence in the driver.................................78
4.13 JPPF Executor Services.........................................86
4.14 Grid topology monitoring.........................................90
4.15 Job monitoring API..................................................96
4.16 The JPPF statistics API.........................................101
4.17 The Location API...................................................103
5 Configuration guide......................................................105
5.1 Configuration file specification and lookup..............105
5.2 Includes, substitutions and scripted values in the
configuration..................................................................106
5.3 Reminder: JPPF topology.......................................109
5.4 Configuring a JPPF server......................................110
5.5 Node configuration..................................................116
5.6 Client and administration console configuration......121
5.7 Common configuration properties...........................128
5.8 Putting it all together...............................................129
5.9 Configuring SSL/TLS communications...................131
5.10 The JPPF configuration API..................................136
6 Management and monitoring.......................................140
6.1 Node management.................................................140
6.2 Server management...............................................148
6.3 Nodes management and monitoring via the driver..160
6.4 JVM health monitoring............................................164
7 Extending and Customizing JPPF................................169
7.1 Global extensions...................................................169
7.2 Drivers and nodes...................................................179
7.3 Drivers and clients..................................................186
7.4 Drivers....................................................................195
7.5 Nodes.....................................................................207
7.6 Clients.....................................................................222
7.7 Administration console............................................225
7.8 Flow of customizations in JPPF..............................230
8 Class Loading In JPPF................................................231
8.1 How it works............................................................231
8.2 Class loader hierarchy in JPPF nodes....................232
8.3 Relationship between UUIDs and class loaders.....233
8.4 Built-in optimizations...............................................234
8.5 Class loader delegation models..............................235
8.6 JPPF class loading extensions...............................237
8.7 Related sample.......................................................238
9 Load Balancing............................................................239
9.1 What does it do?.....................................................239
9.2 Load balancing API.................................................240
9.3 Built-in algorithms...................................................247
9.4 Load-balancer state persistence.............................251
10 Database services.....................................................256
10.1 Configuring JDBC data sources............................256
10.2 The JPPFDatasourceFactory API.........................259
10.3 Class loading and classpath considerations.........261
11 J2EE Connector.........................................................262
11.1 Overview of the JPPF Resource Adapter..............262
11.2 Supported Platforms.............................................263
11.3 Configuration and build.........................................263
11.4 How to use the connector API...............................265
11.5 Deployment on a J2EE application server.............270
11.6 Packaging your enterprise application..................296
11.7 Creating an application server port.......................296
12 .Net Bridge.................................................................298
12.1 Introduction...........................................................298
12.2 Using the JPPF .Net API.......................................299
12.3 Management and monitoring................................305
12.4 Recognizing .Net-capable nodes..........................308
12.5 Bridge configuration..............................................309
12.6 .Net bridge limitations............................................311
13 Android Node.............................................................312
13.1 Introduction...........................................................312
13.2 Installation and setup............................................312
13.3 Creating and submitting jobs.................................316
13.4 Packaging the tasks for Android............................316
13.5 Getting and providing feedback from the node and
tasks..............................................................................318
13.6 Security considerations.........................................321
13.7 Building from the source code...............................323
14 Configuration properties reference.............................324
14.1 Server properties..................................................324
14.2 Node properties....................................................325
14.3 Node screen saver properties...............................326
14.4 Application client and admin console properties. . .327
14.5 Administration console properties.........................328
14.6 Common configuration properties.........................329
14.7 SSL properties......................................................330
15 Execution policy reference.........................................331
15.1 Execution Policy Elements....................................331
15.2 Execution policy properties...................................342
15.3 Execution policy XML schema..............................346
16 JPPF Deployment......................................................349
16.1 Drivers and nodes as services..............................349
16.2 Running JPPF on Amazon's EC2, Rackspace, or
other Cloud Services......................................................350
16.3 Nodes in “Idle Host” mode....................................352
16.4 Offline nodes.........................................................353
16.5 Node provisioning.................................................354
16.6 Runtime dependencies.........................................359
16.7 Web administration console..................................361
17 Changes from previous versions................................362
17.1 Changes in JPPF 6.0............................................362
17.2 Changes in JPPF 5.0............................................364
17.3 API changes in JPPF 4.0......................................366
1 Introduction
1.1 Intended audience
This manual is intended for developers, software engineers and architects who wish to discover, learn or deepen their
knowledge of JPPF and how it works. The intent is also to provide enough knowledge to not only write your own
applications using JPPF, but also extend it by creating add-ons and connectors to other frameworks.
1.2 Prerequisites
JPPF works on any system that supports Java. There is no operating system requirement, it can be installed on all flavors
of Unix, Linux, Windows, Mac OS, and other systems such as zOS or other mainframe systems.
JPPF requires the following installed on your machine:
• Java Standard Edition version 7 or later, with the environment variable JAVA_HOME pointing to your Java installation
root folder
• Apache Ant, version 1.9.0 or later, with the environment variable ANT_HOME pointing to the Ant installation root
folder
• Entries in the default system PATH for JAVA_HOME/bin and ANT_HOME/bin
1.3 Where to download
All JPPF software can be downloaded from the JPPF downloads page.
We have tried to give each module a name that makes sense. The format is JPPF-x.y.z-.zip, where:
•
•
•
•
x is the major version number
y is the minor version number
z is the patch release number - it will not appear if no patch has been released (i.e. if it is equal to 0)
is the name given to the module
1.4 Installation
Each JPPF download is in zip format. To install it, simply unzip it in a directory of your choice.
When unzipped, the content will be under a directory called JPPF-x.y.z-
1.5 Running the standalone modules
The JPPF distribution includes a number of standalone modules or components, which can be deployed and run
independantly from any other on separate machines, and/or from a separate location on each machine
These modules are the following:
• application template: this is the application template to use as starting point for a new JPPF application, file JPPFx.y.z-application-template.zip
• driver: this is the server component, file JPPF-x.y.z-driver.zip
• node: this is the node component, file JPPF-x.y.z-node.zip
• administration console: this is the management and monitoring user interface, file JPPF-x.y.z .admin-ui.zip
These can be run from either a shell script (except for the multiplexer) or an Ant script. The ant script is always called
"build.xml" and it always has a default target called "run". To run any of these modules, simply type "ant" or "ant run" in a
command prompt or shell console. The provided shell scripts are named start. where Component is
the JPPF component to run (e.g. “Node”, “Driver”, “Console”) and ext is the file extension, “bat” for Windows systems, or
“sh” for Linux/Unix-like systems.
2 JPPF Overview
2.1 Architecture and topology
A JPPF grid is made of three different types of components that communicate together:
• clients are entry points to the grid and enable developers to submit work via the client APIs
• servers, also called drivers, are the components that receive work from the clients and dispatch it to the nodes
• nodes perform the actual work execution
The figure below shows how all the components are organized together:
From this picture, we can see that the server plays a central role, and its interactions with the nodes define a master /
worker architecture, where the server (i.e. master) distributes the work to the nodes (i.e. workers). This also represents
the most common topology in a JPPF grid, where each client is connected to a single server, and many nodes are
attached to the same server.
As with any such architecture, this one is facing the risk of a single point of failure.To mitigate this risk, JPPF provides the
ability to connect multiple servers together in a peer-to-peer network and additional connectivity options for clients and
nodes, as illustrated in this figure:
Note how some of the clients are connected to multiple servers, providing failover as well as load balancing capabilities.
In addition, and not visible in the previous figure, the nodes have a failover mechanism that will enable them to attach to a
different server, should the one they are attached to fail or die.
The connection between two servers is directional: if server A is connected to server B then A will see B as a client, and B
will see A as a node. This relationship can be made bi-directional by also connecting B to A. Note that in this scenario,
each server taken locally still acts as a master in a master/worker paradigm.
In short, we can say that the single point of failure issue is addressed by a combination of redundancy and dynamic
reconfiguration of the grid topology.
2.2 Work distribution
To understand how the work is distributed in a JPPF grid, and what role is played by each component, we will start by
defining the two units of work that JPPF handles.
A task is the smallest unit of work that can be handled in the grid. From the JPPF perspective, it is considered atomic.
A job is a logical grouping of tasks that are submitted together, and may define a common service level agreement (SLA)
with the JPPF grid.The SLA can have a significant influence on how the job's work will be distributed in the grid, by
specifying a number of behavioral characteristics:
• rule-based filtering of nodes, specifying which nodes the work can be distributed to (aka execution policies)
• maximum number of nodes the work can be distributed to
• job priority
• start and expiration schedule
• user-defined metadata which can be used by the load balancer
To illustrate the most common flow of a job's execution, let's take a look at the following flow chart:
This chart shows the different steps involved in the execution of a job, and where each of them takes place with regards
to the grid component boundaries.
It also shows that the main source of parallelism is provided by the load balancer, whose role is to split each job into
multiple subsets that can be executed on multiple nodes in parallel. There are other sources of parallelism at different
levels, and we will describe them in the next sections.
2.3 Jobs and tasks granularity
The granuarity of the jobs and tasks, that is, the way you divide the workload into small independant units of work, has a
sginificant impact on performance. By design, JPPF is particularly well adapted for workloads that can be divided into
many small tasks independant from each other, also known as the class of embarassingly parallel problems.
There are, however, limits to how much performance can be gained by dividing a work load. In particular, if the tasks are
too small, the overhead of executing them on a grid may largely overweigh the benefits of parallelization, resulting in
overall performance loss. On the other hand, if the tasks are too coarse, the execution time may be equivelent to what
you'd get with a sequential execution. The granularity of the tasks must therefore be carefully considered.
To illustrate this notion, let's take an example: the multiplication of two square matrices. Let's say we have 2 square
matrices A and B of size n. We define the matrix C as the result of their multiplication:
We can see that each element cij of matrix C is the result of n multiplications and (n - 1) additions. The matrix
multiplication is therefore the result of (2n - 1)n2 arithmetic operations and the computational complexity is in O(n 3).
Let's consider multiple way to divide this into independent tasks:
– at he coarsest level, we could have a task that performs the entire matrix multiplication. There is no work division and
therefore no performance gain when compared to the sequential way of doing it, unless we consider performing
many .matrices multiplcations in parallel
– at the finest level, we could have each task perform the computation of a single cij element, that is, (2n -1) artihmetic
operations, with a complexity in O(n). These operations are extremely fast and will be measured in microseconds at
worst. In this case, and unless n is very large, the overhead of parallelizing the tasks and distributing them over a grid
will cost more than the actual computation
– a more appropriate level of granularity would be to have each task compute an entire column of the resulting matric C:
Each task will perform (2n - 1)n arithmectic operations and its complexity is in O(n 2). For sufficiently large values of n,
executing the tasks in parallel will provide a noticeable performance gain. The greater the value of n, the higher the
gain.
Conclusion: chosing the granularity of the tasks is a very important part of the design of a grid-enabled application. As a
rule of thumb, if the execution of a task takes less than 10 milliseconds, you should consider coarser tasks or sequential
execution. At the same time, remember that JPPF is good at exectuing workloads with many tasks. Yuo have therefore to
find a balance between the granularity of the tasks and the level of parallelization that the division of the workload
provides.
2.4 Networking considerations
2.4.1 Two channels per connection
Each connection between a server and any other component is in fact a grouping of two network channels:
• one channel is used to transport job data
• the other channel is used by the JPPF distributed class loader, that allows Java classes to be deployed on-demand
where they are needed, completely transparently from a developer's perspective.
2.4.2 Synchronous networking
In JPPF, all network communications are synchronous and follow a protocol based on a request/response paradigm. The
attribution of requester vs. responder role depends on which components communicate and through which channel.
We illustrate this in the following picture:
This communication model has a number of important implications:
• nodes can only process one job at a time; however they can execute multiple tasks in parallel
• in the same way, a single client / server connection can only process one job at a time; however, each client can be
connected multiple times to the same server, or multiple times to many servers
• in the case of a server-to-server communication, only one job can be processed at a time, since a server attaches to
another server in exactly the same way as a node.
2.4.3 Protocol
JPPF components communicate by exchanging messages. As described in the previous section, each JPPF transaction
will be made of a request message, followed by a response message.
Messages all have the same structure, and are made of one or more blocks of data (in fact blocks of bytes), each
preceded by its block size. Each block of data represents a serialized object graph. Thus, each message can be
represented generically as follows:
Size 1
Serialized Object 1
.....
Size n
Serialized Object n
The actual message format is different for each type of communication channel, and may also differ depending on
whether it is a request or response message:
Job data channel
A job data request is composed of the following elements:
• a header, which is an object representing information about the job, including the number of tasks in the job, the job
SLA, job metadata, and additional information required internally by the JPPF components.
• a data provider, which is a read-only container for data shared among all the tasks
• the tasks themselves
It can be represented as follows:
Header
size
Header
(nb tasks)
Data provider size
Data provider
data
Size 1
Task 1
.....
Size n
Task n
To read the full message, JPPF has to first read the header and obtain the number of tasks in the job.
The response will be in a very similar format, except that it doesn't have a data provider: being read-only, no change to its
content is expected, which removes the need to send it in the response. Thus the response can be represented as:
Header
size
Header
Size 1
(nb tasks)
Task 1
.....
Size n
Task n
Class loader channel
A class loader message, either request or response, is always made of a single serialized object. Therefore, the message
structure is always as follows:
size
Resource request / response
2.5 Sources of parallelism
2.5.1 At the client level
There are three ways JPPF clients can provide parallel processing, which may be used individually or in any combination:
Single client, multiple concurrent jobs
A single client may submit multiple jobs in parallel. This differs from the single client/single job scenario in that the jobs
must be submitted in non-blocking mode, and their results are retrieved asynchronously. An other difference is that the
client must establish multiple connections to the server to enable parallelism, and not just asynchronous submission.
When multiple non-blocking jobs are submitted over a single connection, only one at a time will be submitted, and the
others will be queued on the client side. The only parallelism is in the submission of the jobs, but not in their execution.
To enable parallel execution of multiple jobs, it is necessary to configure a pool of connections for the client.The size of
the pool determines the number of jobs that can be processed in parallel by the server.
Multiple clients
In this configuration, the parallelism occurs naturally, by letting the different clients work concurrently.
Mixed local and remote execution
Clients have the ability to execute jobs locally, within the same process, rather than remotely. They may also use both
capabilities at the same time, in which case a load-balancing mechanism will provide an additional source of parallelism.
2.5.2 At the server level
The server has a number of factors that determine what can be parallelized and how much:
Number of connected clients
The number of connected clients, or more accurately, client connections, has a direct influence on how many jobs can be
processed by the grid at any one time.
The relationship is defined as: maximum number of parallel jobs = total number of client connections
Number of attached nodes
This determines the maximum number of jobs that can be executed on the grid nodes. With regards to the previous point,
we can redefine it as: maximum number of parallel jobs = min(total number of client connections, total number of nodes)
Load balancing
This is the mechanism that splits the jobs into multiple subsets of their tasks, and distributes these subsets over the
available nodes. Given the synchronous nature of the server to node connectins, a node is available only when it is not
already executing a job subset. The load balancing also computes how many tasks will be sent to each node, in a way
that can be static, dynamic, or even user-defined.
Job SLA
The job Service Level Agreement is used to filter out nodes in which the user does not want to see a job executed.
This can be done by specifying an execution policy (rules-based filtering) for the job, or by configuring the maximum
nuumber of nodes a job can run on (grid partitioning).
Parallel I/O
Each server maintains internally a pool of threads dedicated to network I/O. The size of this pool determines how many
nodes the server can communicate with in parallel, at any given time. Furthermore, as communication with the nodes is
non-blocking, this pool of I/O threads is part of a mechanism that achieves a preemptive multitasking of the network I/O.
This means that, even if you have a limited number of I/O threads, the overall result will be as if the server were
communicating with all nodes in parallel.
2.5.3 At the node level
To execute tasks, each node uses a pool of threads that are called “processing threads”. The size of the pool determines
the maximum number of tasks a single node can execute in parallel. The pool size may be adjusted either statically or
dynamically to account for the actual number of processors available to the node, and for the tasks' resource usage
profile (i.e. I/O-bound tasks versus CPU-bound tasks).
3 Tutorial : A first taste of JPPF
3.1 Required software
In this tutorial, we will be writing a sample JPPF application, and we will run it on a small grid. To this effect, we will need
to download and install the following JPPF components:
•
•
•
•
JPPF application template: this is the JPPF-x.y.z-application-template.zip file
JPPF driver: this is the JPPF-x.y.z-driver.zip file
JPPF node: this is the JPPF-x.y.z-node.zip file
JPPF administration console: this is the JPPF-x.y.z-admin-ui.zip file
Note: “x.y.z” designates the latest version of JPPF (major.minor.update). Generally, “x.y.0” is abbreviated into “x.y”.
These files are all available from the JPPF installer and/or from the JPPF download page.
In addition to this, Java 1.7 or later and Apache Ant 1.8.0 or later should already be installed on your machine.
We will assume the creation of a new folder called "JPPF-Tutorial", in which all these components are unzipped. Thus, we
should have the following folder structure:
» JPPF-Tutorial
» JPPF-x.y.z-admin-ui
» JPPF-x.y.z-application-template
» JPPF-x.y.z-driver
» JPPF-x.y.z-node
3.2 Overview
3.2.1 Tutorial organization
We will base this tutorial on a pre-existing application template, which is one of the components of the JPPF distribution.
The advantage is that most of the low-level wiring is already written for us, and we can thus focus on the steps to put
together a JPPF application. The template is a very simple, but fully working, JPPF application, and contains fully
commented source code, configuration files and scripts to build and run it.
It is organized with the following directory structure:
» root directory: contains the scripts to build and run the application
» src: this is where the sources of the application are located
» classes: the location where the Java compiler will place the built sources
» config: contains the JPPF and logging configuration files
» lib: contains the required libraries to build and run the application
3.2.2 Expectations
We will learn how to:
•
•
•
•
•
write a JPPF task
create a job and execute it
process the execution results
manage JPPF jobs
run a JPPF application
The features of JPPF that we will use:
•
•
•
•
•
JPPF task and job APIs
local code changes automatically accounted for
JPPF client APIs
management and monitoring console
configuring JPPF
By the end of this tutorial, we will have a full-fledged JPPF application that we can build, run, monitor and manage in a
JPPF grid. We will also have gained knowledge of the workings of a typical JPPF application and we will be ready to write
real-life, grid-enabled applications.
3.3 Writing a JPPF task
A JPPF task is the smallest unit of code that can be executed on a JPPF grid. From a JPPF perspective, it is thus defined
as an atomic code unit. A task is always defined as an implemntation of the inteface Task. Task extends the Runnable
interface. The part of a task that will be executed on the grid is whatever is written in its run() method.
From a design point of view, writing a JPPF task will comprise 2 major steps:
• create an implementation of Task.
• implement the run() method.
From the template application root folder, navigate to the folder src/org/jppf/application/template. You will see 2 Java files
in this folder: "TemplateApplicationRunner.java" and "TemplateJPPFTask.java". Open the file "TemplateJPPFTask.java" in
your favorite text editor.
In the editor you will see a full-fledged JPPF task declared as follows:
public class TemplateJPPFTask extends AbstractTask
Here we use the more convenient class AbstractTask, which implements all methods in Task, except for run().
Below this, you will find a run() method declared as:
public void run() {
// write your task code here.
System.out.println("Hello, this is the node executing a template JPPF task");
// ...
// eventually set the execution results
setResult("the execution was performed successfully");
}
We can guess that this task will first print a "Hello …" message to the console, then set the execution result by calling the
setResult() method with a string message. The setResult() method actually takes any object, and is provided as a
convenience to store the results of the task execution, for later retrieval.
In this method, to show that we have customized the template, let's replace the line "// ..." with a statement printing a
second message, for instance "In fact, this is more than the standard template". The run() method becomes:
public void run() {
// write your task code here.
System.out.println("Hello, this is the node executing a template JPPF task");
System.out.println("In fact, this is more than the standard template");
// eventually set the execution results
setResult("the execution was performed successfully");
}
Do not forget to save the file for this change to be taken into account.
The next step is to create a JPPF job from one or multiple tasks, and execute this job on the grid.
3.4 Creating and executing a job
A job is a grouping of tasks with a common set of characteristics and a common SLA. These characteristics include:
• common data shared between tasks
• a priority
• a maximum number of nodes a job can be executed on
• an execution policy describing which nodes it can run on
• a suspended indicator, that enables submitting a job in suspended state, waiting for an external command to resume
or start its execution
• a blocking/non-blocking indicator, specifying whether the job execution is synchronous or asynchronous from the
application's point of view
3.4.1 Creating and populating a job
In the JPPF APIs, a job is represented as an instance of the class JPPFJob.
To see how a job is created, let's open the source file "TemplateApplicationRunner.java" in the folder JPPF-x.y.zapplication-template/src/org/jppf/application/template. In this file, navigate to the method createJob().
This method is written as follows:
public JPPFJob createJob(String jobName) throws Exception {
// create a JPPF job
JPPFJob job = new JPPFJob();
// give this job a readable name that we can use to monitor and manage it.
job.setName(jobName);
// add a task to the job.
job.add(new TemplateJPPFTask());
// add more tasks here ...
}
// there is no guarantee on the order of execution of the tasks,
// however the results are guaranteed to be returned in the same
// order as the tasks.
return job;
We can see that creating a job is done by calling the default constructor of class JPPFJob. The call to the method
job.setName(String) is used to give the job a meaningful and readable name that we can use later to manage it. If
this method is not called, an id is automatically generated, as a string of 32 hexadecimal characters.
Adding a task to the job is done by calling the method add(Object task, Object...args). The optional arguments
are used when we want to execute other forms of tasks, that are not implementations of Task. We will see their use in the
more advanced sections of the JPPF user manual. As we can see, all the work is already done in the template file, so
there is no need to modify the createJob() method for now.
3.4.2 Executing a job and processing the results
Now that we have learned how to create a job and populate it with tasks, we still need to execute this job on the grid, and
process the results of this execution. Still in the source file "TemplateApplicationRunner.java", let's navigate to the
main(String...args) method. we will first take a closer look at the try block, which contains a very important
initialization statement:
jppfClient = new JPPFClient();
This single statement initializes the JPPF framework in your application. When it is executed JPPF will do several things:
•
•
•
•
read the configuration file
establish a connection with one or multiple servers for job execution
establish a monitoring and management connection with each connected server
register listeners to monitor the status of each connection
As you can see, the JPPF client has a non-negligible impact on memory and network resources. This is why we
recommend to always use the same instance throughout your application. This will also ensure a greater scalability, as it
is also designed for concurrent use by multiple threads. To this effect, we have declared it in a try-with-resource block and
provide it as a parameter for any method that needs it, in TemplateApplicationRunner.java.
It is always a good practice to release the resources used by the JPPF client when they are no longer used. Since
JPPFClient implements AutoCloseable, this can be done conveniently in a try-with-resources statement:
try (JPPFClient jppfClient = new JPPFClient()) {
// ... use the JPPF client ...
}
Back to the main method, after initializing the JPPF client, the next steps are to initialize our job runner, create a job and
execute it:
// create a runner instance.
TemplateApplicationRunner runner = new TemplateApplicationRunner();
// Create and execute a blocking job
JPPFJob job = runner.executeBlockingJob(jppfClient);
As we can see, the job creation, its execution and the processing of its rresults are all encapsulated in a call to the
method executeBlockingJob(JPPFClient):
/**
* Execute a job in blocking mode.
* The application will be blocked until the job execution is complete.
* @param jppfClient the {@link JPPFClient} instance which submits the job for execution.
* @throws Exception if an error occurs while executing the job.
*/
public void executeBlockingJob(JPPFClient jppfClient) throws Exception {
// Create a job
JPPFJob job = createJob("Template blocking job");
// set the job in blocking mode.
job.setBlocking(true);
// Submit the job and wait until the results are returned.
// The results are returned as a list of Task> instances,
// in the same order as the one in which the tasks where initially added the job.
List> results = jppfClient.submitJob(job);
}
// process the results
processExecutionResults(job.getName(), results);
The call to createJob(jppfClient) is exactly what we saw in the previous section.
The next statement in this method ensures that the job will be submitted in blocking mode, meaning that the application
will block until the job is executed:
job.setBlocking(true);
This is, in fact, optional since submission in blocking mode is the default behavior in JPPF.
The next statement will send the job to the server and wait until it has been executed and the results are returned:
List> results = jppfClient.submitJob(job);
We can see that the results are returned as a list of Task objects. It is guaranteed that each task in this list has the same
position as the corresponding task that was added to the job. In other words, the results are always in the same order as
the tasks in the the job.
The last step is to interpret and process the results. From the JPPF point of view, there are two possible outcomes of the
execution of a task: one that raised a Throwable, and one that did not. When an uncaught Throwable (i.e. generally an
instance of a subclass of java.lang.Error or java.lang.Exception) is raised, JPPF will catch it and set it as the
outcome of the task. To do so, the method Task.setThrowable(Throwable) is called. JPPF considers that exception
processing is part of the life cycle of a task and provides the means to capture this information accordingly.
This explains why, in our template code, we have separated the result processing of each task in 2 blocks:
public void processExecutionResults(List results) {
// process the results
for (Task> task: results) {
if (task.getThrowable() != null) {
// process the exception here ...
} else {
// process the result here ...
}
}
}
The actual results of the computation of a task can be any attribute of the task, or any object accessible from them. The
Task API provides two convenience methods to help doing this: setResult(E) and getResult(), however it is
not mandatory to use them, and you can implement your own result handling scheme, or it could simply be a part of the
task's design.
As an example for this tutorial, let's modify this part of the code to display the exception message if an exception was
raised, and to display the result otherwise:
if (task.getThrowable() != null) {
System.out.println("An exception was raised: " + task.getThrowable ().getMessage());
} else {
System.out.println("Execution result: " + task.getResult());
}
We can now save the file and close it.
3.5 Running the application
We are now ready to test our JPPF application. To this effect, we will need to first start a JPPF grid, as follows:
Step 1: start a server
Go to the JPPF-x.y.z-driver folder and open a command prompt or shell console. Type "startDriver.bat" on Windows or
“./startDriver.sh.” on Linux/Unix. You should see the following lines printed to the console:
driver process id: 6112, uuid: 4DC8135C-A22D-2545-E615-C06ABBF04065
management initialized and listening on port 11191
ClientClassServer initialized
NodeClassServer initialized
ClientJobServer initialized
NodeJobServer initialized
Acceptor initialized
- accepting plain connections on port 11111
- accepting secure connections on port 11443
JPPF Driver initialization complete
The server is now ready to process job requests.
Step 2: start a node
Go to the JPPF-x.y.z-node folder and open a command prompt or shell console. Type "startNode.bat" on Windows or
“./startNode.sh.” on Linux/Unix. You will then see the following lines printed to the console:
node process id: 3336, uuid: 4B7E4D22-BDA9-423F-415C-06F98F1C7B6F
Attempting connection to the class server at localhost:11111
Reconnected to the class server
JPPF Node management initialized
Attempting connection to the node server at localhost:11111
Reconnected to the node server
Node successfully initialized
Together, this node and the server constitute the smallest JPPF grid that you can have.
Step 3: run the application
Go to the JPPF-x.y.z-application-template folder and open a command prompt or shell console. Type "ant". This time, the
Ant script will first compile our application, then run it. You should see these lines printed to the console:
client process id: 4780, uuid: 011B43B5-AE6B-87A6-C11E-0B2EBCFB9A89
[client: jppf_discovery-1-1 - ClassServer] Attempting connection to the class server ...
[client: jppf_discovery-1-1 - ClassServer] Reconnected to the class server
[client: jppf_discovery-1-1 - TasksServer] Attempting connection to the task server ...
[client: jppf_discovery-1-1 - TasksServer] Reconnected to the JPPF task server
Results for job 'Template blocking job' :
Template blocking job - Template task, execution result: the execution was performed
successfully
You will notice that the last printed line is the same message that we used in our task in the run() method, to set the
result of the execution in the statement:
setResult("the execution was performed successfully");
Now, if you switch back to the node console, you should see that 2 new messages have been printed:
Hello, this is the node executing a template JPPF task
In fact, this is more than the standard template
These 2 lines are those that we actually coded at the beginning of the task's run() method:
System.out.println("Hello, this is the node executing a template JPPF task");
System.out.println("In fact, this is more than the standard template");
From these messages, we can conclude that our application was run successfully. Congratulations!
At this point, there is however one aspect that we have not yet addressed: since the node is a separate process from our
application, how does it know to execute our task? Remember that we have not even attempted to deploy the
application classes to any specific location. We have simply compiled them so that we can execute our application locally.
This topic is the object of the next section of this tutorial.
3.6 Dynamic deployment
One of the greatest features of JPPF is its ability to dynamically load the code of an application that was deployed only
locally. JPPF extends the standard Java class loading mechanism so that, by simply using the JPPF APIs, the classes of
an application are loaded to any remote node that needs them. The benefit is that no deployment of the application is
required to have it run on a JPPF grid, no matter how many nodes or servers are present in the grid. Furthermore, this
mechanism is totally transparent to the application developer.
A second major benefit is that code changes are automatically taken into account, without any need to restart the nodes
or the server. This means that, when you change any part of the code executed on a node, all you have to do is recompile
the code and run the application again, and the changes will take effect immediately, on all the nodes that execute the
application.
We will now demonstrate this by making a small, but visible, code change and running it against the server and node we
have already started. If you have stopped them already, just perform again all the steps described in the previous section,
before continuing.
Let's open again the source file "TemplateJPPFTask.java" in src/org/jppf/application/template/, and navigate to the run()
method. Let's replace the first two lines with the following:
System.out.println("*** We are now running a modified version of the code ***");
The run() method should now look like this:
public void run() {
// write your task code here.
System.out.println("*** We are now running a modified version of the code ***");
// eventually set the execution results
setResult("the execution was performed successfully");
}
Save the changes to the file, and open or go back to a command prompt or shell console in the JPPF-x.y.z-applicationtemplate folder. From there, type "ant" to run the application again. You should now see the same messages as in the
initial run displayed in the console. This is what we expected. On the other hand, if you switch back to the node console,
you should now see a new message displayed:
*** We are now running a modified version of the code ***
Success! We have successfully executed our new code without any explicit redeployment.
3.7 Job Management
Now that we are able to create, submit and execute a job, we can start thinking about monitoring and eventually
controlling its life cycle on the grid. To do that, we will use the JPPF administration and monitoring console. The JPPF
console is a standalone graphical tool that provides user-friendly interfaces to:
•
•
•
•
•
obtain statistics on server performance
define, customize and visualize server performance charts
monitor and control the status and health of servers and nodes
monitor and control the execution of the jobs on the grid
manage the workload and load-balancing behavior
3.7.1 Preparing the job for management
In our application template, the job that we execute on the grid has a single task. As we have seen, this task is very shortlive, since it executes in no more than a few milliseconds. This definitely will not allow us us to monitor or manage it with
our bare human reaction time. For the purpose of this tutorial, we will now adapt the template to something more realistic
from this perspective.
Step 1: make the tasks last longer
What we will do here is add a delay to each task, before it terminates. It will do nothing during this time, only wait for a
specified duration. Let's edit again the source file "TemplateJPPFTask.java" in
JPPF-x.y.z-application-template/src/org/jppf/application/template/ and modify the run() method as follows:
public void run() {
// write your task code here.
System.out.println("*** We are now running a modified version of the code ***");
// simply wait for 3 seconds
try {
Thread.sleep(3000L);
} catch(InterruptedException e) {
setThrowable(e);
return;
}
// eventually set the execution results
setResult("the execution was performed successfully");
}
Note that here, we make an explicit call to setException(), in case an InterruptedException is raised. Since the
exception would be occurring in the node, capturing it will allow us to know what happened from the application side.
Step 2: add more tasks to the job, submit it as suspended
This time, our job will contain more than one task. In order for us to have the time to manipulate it from the administration
console, we will also start it in suspended mode. To this effect, we will modify the method createJob() of the
application runner "TemplateApplicationRunner.java" as follows:
public JPPFJob createJob() throws Exception {
// create a JPPF job
JPPFJob job = new JPPFJob();
// give this job a readable unique id that we can use to monitor and manage it.
job.setName("Template Job Id");
// add 10 tasks to the job.
for (int i=0; i<10; i++) job.add(new TemplateJPPFTask());
// start the job in suspended mode
job.getSLA().setSuspended(true);
return job;
}
Step 3: start the JPPF components
If you have stopped the server and node, simply restart them as described in the first two steps of section 3.5 of this
tutorial. We will also start the administration console: go to the JPPF-x.y.z-admin-ui folder and open a command prompt or
shell console. Type "ant". When the console is started, you will see a panel named "Topology" displaying the servers and
the nodes attached to them. It should look like this:
We can see here that a server is started on machine "lolo-quad" and that it has a node attached to it. The color for the
server is a health indicator, green meaning that it is running normally and red meaning that it is down.
Let's switch to the "Job Data" panel, which should look like this:
We also see the color-coded driver health information in this panel. There is currently no other element displayed,
because we haven't submitted a job yet.
Step 4: start a job
We will now start a job by running our application: go to the JPPF-x.y.z-application-template folder and open a command
prompt or shell console. Type "ant". Switch back to the administration console. We should now see some change in the
display:
We now see that a job is present in the server's queue, in suspended state (yellow highlighting). Here is an explanation of
the columns in the table:
• "Driver / Job / Node" : displays an identifier for a server, for a job submitted to that server, or for a node to which
some of the tasks in the job have been dispatched for execution
• "State" : the current state of a job, either "Suspended" or "Executing"
• "Initial task count" : the number of tasks in the job at the time it was submitted by the application
• "Current task count": the number of tasks remaining in the job, that haven't been executed
• "Priority" : this is the priority, of the job, the default value is 0.
• "Max nodes" : the maximum number of nodes a job can be executed on. By default, there is no limit, which is
represented as the infinity symbol
Step 5: resume the job execution
Since the job was submitted in suspended state, we will resume its execution manually from the console. Select the line
where the job "Template Job Id" is displayed. You should see that some buttons are now activated. Click on the resume
button (marked by the icon
) to resume the job execution, as shown below:
As soon as we resume the job, the server starts distributing tasks to the node, and we can see that the current task count
starts decreasing accordingly, and the job status has been changed to "Executing":
You are encouraged to experiment with the tool and the code. For example you can add more tasks to the job, make them
last longer, suspend, resume or terminate the job while it is executing, etc...
3.8 Conclusion
In this tutorial, we have seen how to write a JPPF-enabled application from end to end. We have also learned the basic
APIs that allow us to write an application made of atomic and independent execution units called tasks, and group them
into jobs that can be executed on the grid. We have also learned how jobs can be dynamically managed and monitored
while executing. Finally, we also learned that, even though an application can be distributed over any number of nodes,
there is no need to explicitely deploy the application code, since JPPF implicitely takes care of it.
4 Development Guide
4.1 Task objects
In JPPF terms, a task is the smallest unit of execution that can be handled by the framework. We will say that it is an
atomic execution unit. A JPPF application creates tasks, groups them into a job, and submits the job for execution on the
grid.
4.1.1 Task
Task is the base interface for any task that is run by JPPF. We will see in the next sections that other forms of tasks, that
do not inherit from Task, are still wrapped by the framework in an implemntation of Task.
JPPFTask is defined as follows:
public interface Task extends Runnable, Serializable {
...
}
We have outlined three important keywords that characterize JPPFTask:
• interface: Task cannot be used directly, it must be implemented/extended to construct a real task
• Runnable: when writing a JPPF task, the run() method of java.lang.Runnable must be implemented. This is the
part of a task that will be executed on a remote node.
• Serializable: tasks are sent to servers and nodes over a network. JPPF uses the Java serialization mechanism to
transform task objects into a form appropriate for networking
The JPPF API provides a convenient abstract implementation of Task, which implements all the methods of Task except
run(): to write a real task in your application, you simply extend AbstractTask to implement your own type:
public class MyTask extends AbstractTask {
@Override
public void run() {
// ... your code here ...
}
}
We will now review the functionalities that are inherited from Task.
If you are familiar with the JPPF 3.x APIs, please note that the legacy class JPPFTask is now redefined as:
public class JPPFTask extends AbstractTask {
}
4.1.1.1 Execution results handling
JPPFTask provides 2 convenience methods to store and retrieve the results of the execution:
• public void setResult(Object result) : stores the execution result; the argument must be serializable
• public Object getResult() : retrieves the execution result
Here is an example using these methods:
public class MyTask extends AbstractTask {
@Override
public void run() {
// ... some code here ...
setResult("This is the result");
}
}
and later in your application, you would use:
String result = myTask.getResult();
Using getResult() and setResult() is not mandatory. As we mentioned earlier, these methods are provided as
conveniences with a meaningful semantics attached to them. There are many other ways to store and retrieve execution
results, which can be used to the exclusion of others, or in any combination. These include, but are not limited to:
•
•
•
•
•
using custom attributes in the task class and their accessors
storing and getting data to/from a database
using a file system
using third-party applications or libraries
etc ...
4.1.1.2 Exception handling - task execution
Exception handling is a very important part of processing a task. In effect, exceptions may arise from many places: in the
application code, in the JVM, in third-party APIs, etc... To handle this, JPPF provides both a mechanism to process
uncaught exceptions and methods to store and retrieve exceptions that arise while executing a task.
JPPFTask provides 2 methods to explicitly handle exceptions:
• public void setThrowable(Throwable t) : store a throwable for later retrieval
• public Throwable getThrowable() : retrieve a throwable that was thrown during execution
Here is an example of explicit exception handling:
public class MyTask extends AbstractTask {
@Override
public void run() {
try {
// ... some code here ...
} catch(Exception e) {
setThrowable(e);
}
}
}
Later on, you can retrieve the throwable as follows:
Throwable throwable = myTask.getThrowable();
JPPF also automatically handles uncaught throwables. Uncaught throwables are never propagated beyond the scope of a
task, as this would cause an unpredictable behavior of the node that executes the task. Instead, they are stored within
the task using the setThrowable() method. This way, it is always possible for the application to know what happened.
The following code shows how JPPF handles uncaught throwables:
Task> task = ...;
try {
task.run();
} catch(Throwable t) {
task.setThrowable(t);
}
Then in the application, you can retrieve the throwable as follows:
Task> task = ...;
Throwable t = task.getThrowable();
if (t != null) {
t.printtStackTrace();
}
4.1.1.3 Task life cycle
JPPF provides some options to control a task's life cycle once it has been submitted, including the following:
• task cancellation: this cannot be invoked directly on a task, but is rather invoked as part of cancelling a whole job. If a
task is cancelled before its execution starts, then it will never start.
• task timeout: the timeout countdown starts with the task's execution. If a timeout expires before the task starts
executing, then the task will not time out.
In all cases, if a task has already completed its execution, it cannot be cancelled or timed out anymore.
Apart from timeout settings, controlling the life cycle of a task is normally done externally, using the JPPF remote
management facilities. We will see those later, in a dedicated chapter of this user manual.
It is possible to perform a specific processing when a task life cycle event occurs. For this, JPPFTask provides a callback
method for each type of event:
public void onCancel(): invoked when the task is cancelled
public void onTimeout(): invoked when the task times out
For both methods, an attempt to cancel the task will be performed, by calling Thread.interrrupt() on the thread that
executes it, then the onCancel() or onTimeout() method will be invoked. The implication is that the callback
invocation takes place after the task's run() method returns, whether it was immediately interrupted (if the thread was
doing an interruptible operation) or not.
By default, these methods do not do anything. You can, however, override them to implement any application-specific
processing, such as releasing resources used by the task, updating the state of the task, etc.
Here is a code sample illustrating these concepts:
public class MyTask extends AbstractTask {
@Override public void run() {
// task processing here ...
}
@Override public void onCancel() {
// process task cancel event ...
}
}
@Override public void onTimeout() {
// process task timeout event ...
}
A task timeout can be set by using a JPPFSchedule object, which is an immutable object that proposes two constructors:
// schedule after a specified duration in milliseconds
public JPPFSchedule(final long duration)
// schedule at a specified fixed date/time
public JPPFSchedule(String date, String format)
Using a JPPFSchedule, we can thus set and obtain a task timeout using the corresponding accessors:
public Task extends Runnable, Serializable {
// get the timeout schedule
public JPPFSchedule getTimeoutSchedule();
// set a new timeout schedule
public Task setTimeoutSchedule(JPPFSchedule timeoutSchedule);
}
For example:
// set the task to expire after 5 seconds
myTask.setTimeout(new JPPFSchedule(5000L));
// set the task to expire on 9/30/2012 at 12:08 pm
myTask.setTimeoutSchedule(new JPPFSchedule("09/30/2012 12:08 PM", "MM/dd/yyyy hh:mm a"));
4.1.2 Exception handling - node processing
It is possible that an error occurs while the node is processing a a task, before or after its execution. These error
conditions include any instance of Throwable, i.e. any Exception or Error occurring during serialization or deserialization
of the tasks, or while sending or receiving data to or from the server.
When such an error occurs, the Throwable that was raised for each task in error is propagated back to the client which
submitted the job, and set upon the initial instance of the task in the job. It can then be obtained, upon receiving the
execution results, with a call to Task.getThrowable().
4.1.3 Getting information on the node executing the task
A Task can obtain information on the node using the following methods:
public interface Task extends Runnable, Serializable {
// is the task executing in a node or in the client?
public boolean isInNode()
// get the node executing this task, if any
public Node getNode()
}
The isInNode() method determines whether the task is executing within a node or within a client with local execution
enabled.
The getNode() method returns an instance of the Node interface, defined as follows:
public interface Node extends Runnable {
// Get this node's UUID
String getUuid();
// Get the system information for this node
JPPFSystemInformation getSystemInformation();
// Determine whether this node is local to another component
boolean isLocal();
// Determine whether this node is running in offline mode
boolean isOffline();
// Determine whether this node is a 'master' node for the provisioning features
boolean isMasterNode();
// Determine whether this node is a 'slave' node for the provisioning features
boolean isSlaveNode();
// Determine whether this node can execute Net tasks
boolean isDotnetCapable();
// Determine whether this node is an Android node
boolean isAndroid();
// Get the JMX connector server associated with the node
JMXServer getJmxServer() throws Exception;
// Reset the current task class loader if any is present
ClassLoader resetTaskClassLoader(Object...params);
}
Note that Task.getNode() will return null if the task is executing within a client local executor.
Here is an example usage:
public class MyTask extends AbstractTask {
@Override
public void run() {
if (isInNode()) {
setResult("executing in remote node with uuid = " + getNode().getUuid());
} else {
setResult("executing in client-local executor");
}
}
}
4.1.4 Executing code in the client from a task
The Task API provides a method that will allow a task to send code for execution in the client:
public interface Task extends Runnable, Serializable {
// send a callable for execution on the client side
public V compute(JPPFCallable callable)
}
The method compute() takes a JPPFCallable as input, which is a Serializable extension of the Callable interface
and will be executed on the client side.The return value is the result of calling JPPFCallable.call() on the client side.
Example usage:
public class MyTask extends AbstractTask {
@Override public void run() {
String callableResult;
// if this task is executing in a JPPF node
String callResult = compute(isInNode() ? new NodeCallable() : new Clientallable());
// set the callable result as this task's result
setResult(callResult);
}
public static class NodeCallable implements JPPFCallable {
@Override public String call() throws Exception {
return "executed in the NODE";
}
}
}
public static class ClientCallable implements JPPFCallable {
@Override public String call() throws Exception {
return "executed in the CLIENT";
}
}
4.1.5 Sending notifications from a task
Task provides an API which allows tasks to send notifications during their execution:
public interface Task extends Runnable, Serializable {
// Causes the task to send a notification to all listeners
Task fireNotification(Object userObject, boolean sendViaJmx);
}
The first parameter userObject can be any object provided by the user code. The second parameter sendViaJmx
specifies whether this notification should also be sent via the node's JPPFNodeTaskMonitorMBean, instead of only to
locally registered listeners. If it is true, it is recommended that userObject be Serializable. We will see in further
chapters of this documentation how to register local and JMX-based listeners. Let's see here how these listeners can
handle the notifications.
Here is an example of JMX listener registered with one or more JPPFNodeTaskMonitorMBean instances:
public class MyTaskJmxListener implements NotificationListener {
@Override
public synchronized void handleNotification(Notification notif, Object handback) {
// cast to the JPPF notification type, then get and print the user object
Object userObject = ((TaskExecutionNotification) notif).getUserData();
System.out.println("received notification with user object = " + userObject);
// determine who sent this notification
boolean userNotif = ((TaskExecutionNotification) notif).isUserNotification();
System.out.println("this notification was sent by the "
+ (userNotif ? "user" : "JPPF node"));
}
}
A local TaskExecutionListener would be like this:
public class MyTaskLocalListener extends TaskExecutionListener {
@Override
// task completion event sent by the node
void taskExecuted(TaskExecutionEvent event) {
TaskExecutionInfo info = event.getTaskInformation();
System.out.println("received notification with task info = " + info);
}
}
@Override
// task notification event sent by user code
void taskNotification(TaskExecutionEvent event) {
Object userObject = event.getUserObject();
System.out.println("received notification with user object = " + userObject);
}
Consider the following task:
public class MyTask extends AbstractTask {
@Override
public void run() {
fireNotification("notification 1", false);
fireNotification("notification 2", true);
}
}
During the execution of this task, a MyTaskJmxListener instance would only receive “notification 2”, whereas a
MyTaskLocalListener instance would receive both “notification 1” and “notification 2”.
4.1.6 Resubmitting a task
The class AbstractTask also provides an API which allows a task to request that it be resubmitted by the server, instead of
being sent back to the client as an execution result. This can prove very useful for instance when a task must absolutely
complete successfully, but an error occurs during its execution. The API for this is defined as follows:
public abstract class AbstractTask implements Task {
// Determine whether this task will be resubmitted by the server
public boolean isResubmit()
// Specify whether this task should be resubmitted by the server
public Task setResubmit(final boolean resubmit)
}
// Get
public
// Set
public
the maximum number of times a task can be resubmitted
int getMaxResubmits();
the maximum number of times a task can be resubmitted
Task setMaxResubmits(int maxResubmits);
Note that the resubmit and maxResubmits attributes are transient, which means that upon when the task is executed
in a remote node, they will be reset to their initial value of false and -1, respectively.
The maximum number of times a task can be resubmitted may be specified in two ways:
• in the job SLA via the maxTaskResubmits attribute
• with the task's setMaxResubmits() method; in this case any value >= 0 will override the job SLA's value
Finally, a task resubmission only works for tasks sent to a remote node, and will not work in the client's local executor.
4.1.7 JPPF-annotated tasks
Another way to write a JPPF task is to take an existing class and annotate one of its public methods or constructors using
@JPPFRunnable.
Here is an example:
public class MyClass implements Serializable {
@JPPFRunnable
public String myMethod(int intArg, String stringArg) {
String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
System.out.println(s);
return s;
}
}
We can see that we are simply using a POJO class, for which we annotated the myMethod() method with
@JPPFRunnable. At runtime, the arguments of the method will be passed when the task is added to a job, as illustrated
in the following example:
JPPFJob job = new JPPFJob();
Task> task = job.add(new MyClass(), 3, "string arg");
Here we simply add our annotated class as a task, setting the two arguments of the annotated method in the same call.
Note also that a JPPFTask object is returned. It is generated by a mechanism that wraps the annotated class into a
JPPFTask, which allows it to use most of the functionalities that come with it.
JPPF-annotated tasks present the following properties and constraints:
• if the annotated element is an instance (non-static) method, the annotated class must be serializable
• if the class is already an instance of Task, the annotation will be ignored
• if an annotated method has a return type (i.e. non void), the return value will be set as the task result
• it is possible to annotate a public method or constructor
• an annotated method can be static or non-static
• if the annotated element is a constructor or a static method, the first argument of JPPFJob.add() must be a Class
object representing the class that declares it.
• an annotated method or constructor can have any signature, with no limit on the number of arguments
• through the task-wrapping mechanism, a JPPF-annotated class benefits from the Task facilities described
previously, except for the callback methods onCancel() and onTimeout().
Here is an example using an annotated constructor:
public class MyClass implements Serializable {
@JPPFRunnable
public MyClass(int intArg, String stringArg) {
String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
System.out.println(s);
}
}
JPPFJob job = new JPPFJob();
Task> task = job.add(MyClass.class, 3, "string arg");
Another example using an annotated static method:
public class MyClass implements Serializable {
@JPPFRunnable
public static String myStaticMethod(int intArg, String stringArg) {
String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
System.out.println(s);
return s;
}
}
JPPFJob job = new JPPFJob();
Task> task = job.add(MyClass.class, 3, "string arg");
Note how, in the last 2 examples, we use MyClass.class as the first argument in JPPFJob.add().
4.1.8 Runnable tasks
Classes that implement java.lang.Runnable can be used as JPPF tasks without any modification. The run() method
will then be executed as the task's entry point. Here is an example:
public class MyRunnableClass implements Runnable, Serializable {
public void run() {
System.out.println("Hello from a Runnable task");
}
}
JPPFJob job = new JPPFJob();
Task> task = job.add(new MyRunnableClass());
The following rules apply to Runnable tasks:
• the class must be serializable
• if the class is already an instance of Task, or annotated with @JPPFRunnable, it will be processed as such
• through the task-wrapping mechanism, a Runnable task benefits from the Task facilities described in a previous
section, except for the callback methods onCancel() and onTimeout().
4.1.9 Callable tasks
In the same way as Runnable tasks, classes implementing java.util.concurrent.Callable can be directly
used as tasks. In this case, the call() method will be used as the task's execution entry point. Here's an example:
public class MyCallableClass implements Callable, Serializable {
public String call() throws Exception {
String s = "Hello from a Callable task";
System.out.println(s);
return s;
}
}
JPPFJob job = new JPPFJob();
Task> task = job.add(new MyCallableClass());
The following rules apply to Callable tasks:
• the Callable class must be serializable
• if the class is already an instance of Task, annotated with @JPPFRunnable or implements Runnable, it will be
processed as such and the call() method will be ignored
• the return value of the call() method will be set as the task result
• through the task-wrapping mechanism, a callable class benefits from the Task facilities described in a previous
section, except for the callback methods onCancel() and onTimeout().
4.1.10 POJO tasks
The most unintrusive way of defining a task is by simply using an existing POJO class without any modification. This will
allow you to use existing classes directly even if you don't have the source code. A POJO task offers the same
possibilities as a JPPF annotated task (see section JPPF-annotated tasks), except for the fact that we need to specify
explicitly which method or constructor to use when adding the task to a job. To this effect, we use a different form of the
method JPPFJob.addTask(), that takes a method or constructor name as its first argument.
Here is a code example illustrating these possibilities:
public class MyClass implements Serializable {
public MyClass(int intArg, String stringArg) {
String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
System.out.println(s);
}
public String myMethod(int intArg, String stringArg) {
String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
System.out.println(s);
return s;
}
public static String myStaticMethod(int intArg, String stringArg) {
String s = "int arg = " + intArg + ", string arg = \"" + stringArg + "\"";
System.out.println(s);
return s;
}
}
JPPFJob job = new JPPFJob();
// add a task using the constructor as entry point
Task> task1 = job.add("MyClass", MyClass.class, 3, "string arg");
// add a task using an instance method as entry point
Task> task2 = job.add("myMethod", new MyClass(), 3, "string arg");
// add a task using a static method as entry point
Task> task3 = job.add("myStaticMethod", MyClass.class, 3, "string arg");
POJO tasks present the following properties and constraints:
• if the entry point is an instance (non-static) method, the class must be serializable
• if a method has a return type (i.e. non void), the return value will be set as the task result
• it is possible to use a public method or constructor as entry point
• a method entry point can be static or non-static
• A POJO task is added to a job by calling a JPPFJob.add() method whose first argument is the method or
constructor name.
• if the entry point is a constructor or a static method, the second argument of JPPFJob.add() be a Class object
representing the class that declares it.
• an annotated method or constructor can have any signature, with no limit on the number of arguments
• through the task-wrapping mechanism, a JPPF-annotated class benefits from the Task facilities described
previously, except for the callback methods onCancel() and onTimeout().
4.1.11 Interruptibility
Sometimes, it is not desirable that the thread executing a task be interrupted upon a timeout or cancellation request. To
control this behavior, a task should override its isInterruptible() method, as in this example:
public class MyTask extends AbstractTask {
@Override public void run() {
// ...
}
}
@Override public boolean isInterruptible() {
// this task can't be interrupted
return false;
}
Note that, by default, a task which does not override its isInterruptible() method is interruptible.
Tasks that do not extend AbstractTask, such as Callable, Runnable, Pojo tasks or tasks annotated with @JPPFRunnable,
will need to implement the Interruptibility interface to override the interuptible flag, as in this example:
public class MyCallable implements Callable, Serializable, Interruptibility {
@Override public String call() throws Exception {
return "task result";
}
}
@Override
public boolean isInterruptible() {
return false; // NOT interruptible
}
4.1.12 Cancellation handler
Tasks that need a callback invoked immediately upon cancellation, whether the thread interruption succeeded or not, can
implement the CancellationHandler interface, defined as follows:
public interface CancellationHandler {
// Invoked immediately when a task is cancelled
void doCancelAction() throws Exception;
}
This can be used on its own, or in combination with the onCancel() method as in this example:
public class MyTask extends AbstractTask implements CancellationHandler {
private long start;
@Override
public void run() {
start = System.nanoTime();
// ... task processing ...
}
@Override
public void doCancelAction() throws Exception {
long elapsed = (System.nanoTime() - start) / 1_000_000L;
System.out.println("doCancelAction() called after " + elapsed + " ms");
}
}
@Override
public void onCancel() {
long elapsed = (System.nanoTime() - start) / 1_000_000L;
System.out.println("onCancel() called after " + elapsed + " ms");
}
The interface applies to all types of tasks: tasks that extend AbstractTask, pojo tasks, Cancellable and Runnable tasks,
along with @JPPFRunnable-annotated tasks.
4.1.13 Running non-Java tasks: CommandLineTask
JPPF has a pre-defined task type that allows you to run an external process from a task. This process can be any
executable program (including java), shell script or command. The JPPF API also provides a set of simple classes to
access data, whether in-process or outside, local or remote.
The class that will allow you to run a process is CommandLineTask. Like AbstractTask, it is an abstract class that you
must extend and whose run() method you must override.
This class provides methods to:
Setup the external process name, path, arguments and environment:
public abstract class CommandLineTask extends AbstractTask {
// list of commands passed to the shell
public List getCommandList()
public CommandLineTask setCommandList(List commandList)
public CommandLineTask setCommandList(String... commands)
// set of environment variables
public Map getEnv()
public CommandLineTask setEnv(Map env)
}
// directory in which the command is executed
String getStartDir()
public CommandLineTask setStartDir(String startDir)
You can also use the built-in constructors to do this at task initialization time:
public abstract class CommandLineTask extends AbstractTask {
public CommandLineTask(Map env, String startDir, String... commands)
public CommandLineTask(String... commands)
}
Launch the process:
The process is launched by calling the following method from the run() method of the task:
// launch the process and return its exit code
public int launchProcess()
This method will block until the process has completed or is destroyed. The process exit code can also be obtained via
the following method:
// get the process exit code
public int getExitCode()
Setup the capture of the process output:
You can specify and determine whether the process output (either standard or error console output) is or should be
captured, and obtain the captured output:
public abstract class CommandLineTask extends AbstractTask {
public boolean isCaptureOutput()
public CommandLineTask setCaptureOutput(boolean captureOutput)
}
// corresponds to what is sent to System.out / stdout
public String getErrorOutput()
// corresponds to what is sent to System.err / stderr
public String getStandardOutput()
Here is a sample command line task that lists the content of a directory in the node's file system:
import org.jppf.server.protocol.*;
// This task lists the files in a directory of the node's host
public class ListDirectoryTask extends CommandLineTask {
// Execute the script
@Override
public void run() {
try {
// get the name of the node's operating system
String os = System.getProperty("os.name").toLowerCase();
// the type of OS determines which command to execute
if (os.indexOf("linux") >= 0) {
setCommandList("ls", "-a", "/usr/local");
} else if (os.indexOf("windows") >= 0) {
setCommandList("cmd", "/C", "dir", "C:\\Windows");
}
// enable the capture of the console output
setCaptureOutput(true);
// execute the script/command
launchProcess();
// get the resulting console output and set it as a result
String output = getStandardOutput();
setResult(output);
} catch(Exception e) {
setException(e);
}
}
}
4.1.14 Executing dynamic scripts: ScriptedTask
The class ScriptedTask allows you to execute scripts written in any dynamic language available via the javax.script APIs.
It is defined as follows:
public class ScriptedTask extends AbstractTask {
// Initialize this task with the specified language, script provided as a string
// and set of variable bindings
public ScriptedTask(String language, String script, String reusableId,
Map bindings) throws IllegalArgumentException
// Initialize this task with the specified language, script provided as a reader
// and set of variable bindings
public ScriptedTask(String language, Reader scriptReader, String reusableId,
Map bindings) throws IllegalArgumentException, IOException
// Initialize this task with the specified language, script provided as a file
// and set of variable bindings
public ScriptedTask(String language, File scriptFile, String reusableId,
Map bindings) throws IllegalArgumentException, IOException
// Get the JSR 223 script language to use
public String getLanguage()
// Get the script to execute from this task
public String getScript()
// Get the unique identifier for the script
public String getReusableId()
// Get the user-defined variable bindings
public Map getBindings()
// Add the specified variable to the user-defined bindings
public ScriptedTask addBinding(String name, Object value)
// Remove the specified variable from the user-defined bindings
public Object removeBinding(String name)
}
Since ScriptedTask is a subclass of AbstractTask, it has all the features that come with it, including life cycle
management, error handling, etc. There is a special processing for Throwables raised by the script engine: some engines
raise throwables which are not serializable, which may prevent JPPF from capturing them and return them back to the
client application. To work around this, JPPF will instantiate a new exception with the same message and stack trace as
the original exception. Thus some information may be lost, and you may need to handle these exceptions from within the
scripts to retrieve this information.
The reusableId parameter, provided in the constructors, indicates that, if the script engine has that capability, compiled
scripts will be stored and reused, to avoid compiling the same scripts repeatedly. In a multithreaded context, as is the
case in a JPPF node, multiple compilations may still occur for the script, since it is not possible to guarantee the threadsafety of a script engine, and compiled scripts are always associated with a single script engine instance. Thus, a script
may be compiled multiple times, but normally no more than there are processing threads in the node.
Java objects can be passed as variables to the script via the bindings, either in one of the constructors or using the
addBinding() and removeBinding() methods. Additionally, a ScriptedTask always adds a reference to itself with the name
“jppfTask”, or the equivalent in the chosen script language, for instance. $jppfTask in PHP.
The value returned by the script, if any, will be set as the task result, unless it has already been set to a non-null value, by
calling jppfTask.setResult(...) from within the script.
For example, in the following Javascript script:
function myFunc() {
jppfTask.setResult('Hello 1');
return 'Hello 2';
}
myFunc();
The result of the evaluation of the script is the string “Hello 2”. However, the task result will be “Hello 1”, since it was set
before the end of the script. If you comment out the first statement of the function ( jppfStat.setResult() statement),
then this time the task result will be the same as the script result “Hello 2”.
4.2 Dealing with jobs
A job is a grouping of tasks with a common set of characteristics and a common SLA. These characteristics include:
• common data shared between tasks (data provider)
• A common Service Level Agreement (SLA) comprising:
– the job priority
– the maximum number of nodes a job can be executed on
– an optional execution policy describing which nodes the job can run on
– a suspended indicator, that enables submitting a job in suspended state, waiting for an external command to
resume or start its execution
– an execution start date and time
– an expiration (timeout) date and time
– an indicator specifying what the server should do when the application is disconnected
• a blocking/non-blocking indicator, specifying whether the job execution is synchronous or asynchronous from the
application's point of view
• a listener to receive notifications of completed tasks when running in non-blocking mode
• the ability to receive notifications when the job execution starts and completes
• a persistence manager, to store the job state during execution, and recover its latest saved state on demand, in
particular after an application crash
In the JPPF API, a job is represented by the class JPPFJob. In addition to accessors and mutators for the attributes we
have seen above, JPPFJob provides methods to add tasks and a set of constructors that the make creation of jobs
easier.
4.2.1 Creating a job
To create a job, JPPFJob has a single no-arg constructor, which generates a unique universal identifier for the job:
public class JPPFJob extends AbstractJPPFJob
implements Iterable>, Future> {
// creates a job with default values for its attributes
public JPPFJob()
}
// get the UUID of this job
public String getUuid(uuid)
The job UUID is automatically generated as a pseudo-random string of hexadecimal characters in the standard 8-4-4-12
format. It can then be obtained with the job's getUuid() method.
Important note: the role of the job UUID is critical, since it is used to distinguish the job from potentially many others in
all JPPF grid topologies. It is also used in most job management and monitoring operations.
Each job also has a name, which can be used to identify a job in a human readable way. When a job is created, its name
is set to the job UUID. It can later be changed or accessed with the following accessors:
public class JPPFJob extends AbstractJPPFJob
implements Iterable>, Future> {
// get the name of this job
public String getName()
}
// assign a name to this job
public void setName(String name)
Note that the job's name is displayed in the "job data" view of the JPPF graphical administration console.
4.2.2 Adding tasks to a job
As we have seen in section 4.1 about the various forms of tasks that we can use in JPPF, JPPFJob provides two
methods to add tasks to a job.
Addding a JPPFTask, annotated, Runnable or Callable task
public Task> add(Object taskObject, Object...args) throws JPPFException
The taskObject parameter can be one of the following:
•
•
•
•
•
an instance of Task
an instance of a class with a non-static public method annotated with @JPPFRunnable
a Class object representing a class that has a public static method or a constructor annotated with @JPPFRunnable
an instance of a a Runnable class
an instance of a Callable class
The args parameter is optional and is only used to pass the arguments of a method or constructor annotated with
@JPPFRunnable. It is ignored for all other forms of tasks.
The return value is an instance of Task, regardless the type of task that is added. In the case of an annotated, Runnable
or Callable task, the original task object, wrapped by this Task, can be retrieved using the method
Task.getTaskObject(), as in the following example:
Task> task = job.add(new MyRunnableTask());
MyRunnableTask runnableTask = (MyRunnableTask) task.getTaskObject();
As JPPF uses reflection to properly wrap the task, an eventual exception may be thrown, wrapped in a JPPFException.
Adding a POJO task
public Task> add(String method, Object taskObject, Object...args) throws JPPFException
The method parameter is the name of the method or of the constructor to execute as the entry point of the task. In the
case of a constructor, it must be the same as the name of the class.
The taskObject parameter can be one of the following:
• an instance of the POJO class if the entry point is a non-static method
• a Class object representing a POJO class that has a public static method or a constructor as entry point
The optional args parameter is used to pass the arguments of a method or constructor defined as the task's entry point.
As for the other form of this method, the return value is a JPPFTask, and the original task object can be retrieved using
the method JPPFTask.getTaskObject(), as in the following example:
Task> task = job.add("myMethod", new MyPOJO(), 3, "string");
MyPOJO pojo = (MyPOJO) task.getTaskObject();
// we can also set a timeout on the wrapper
task.setTimeoutSchedule(new JPPFSchedule(5000L));
As JPPF uses reflection to properly wrap the task, an eventual exception may be thrown, wrapped in a JPPFException.
4.2.3 Inspecting the tasks of a job
JPPFJob provides two ways to get and inspect its tasks: one way is to call the method getJobTasks() to obtain the list
of tasks, the other is to take advantage of JPPFJob implementing Iterable>.
For example, the following two ways to iterate over the tasks in a job are equivalent:
JPPFJob myJob = ...;
// get the list of tasks in the job and iterate over them
for (Task> task: myJob.getJobTasks()) {
// do something ...
}
// iterate over the job directly
for (Task> task: myJob) {
// do something ...
}
4.2.4 Non-blocking jobs
Jobs can be submitted asynchronously from the application's perspective. This means that an asynchronous (or nonblocking) job will not block the application thread from which it is submitted. It also implies that we must have the means
to obtain the execution results at a later time.
The blocking attriibute of a job is accessed with the following getter and setter:
public class JPPFJob extends AbstractJPPFJob
implements Iterable>, Future> {
}
// Determine whether the execution of this job is blocking on the client side
public boolean isBlocking()
// Specify whether the execution of this job is blocking on the client side
public void setBlocking(final boolean blocking)
Note that a job is blocking by default, therefore you must explicitely call setBlocking(false) before submitting it, to
make it an asynchrnous job.
4.2.5 Job submission
Jobs are submitted with the JPPFClient API, as seen later on in this manual. What is important to retain is that the
immediate outcome of a job submission is different for blocking and non-blocking jobs, as illustrated in the following
examples:
JPPFClient client = ...;
// a new job is blocking by default
JPPFJob blockingJob = new JPPFJob();
blockingJob.add(new MyTask()):
// blocks until the job has completed
List> results = client.submit(blockingJob);
JPPFJob nonBlockingJob = new JPPFJob();
nonBlockingJob.setBlocking(false);
nonBlockingJob.add(new MyTask()):
// returns null immediately, without blocking the current thread
client.submit(blockingJob);
// ... later on, collect the results
List> results2 = nonBlockingJob.awaitResults();
4.2.6 Job execution results
JPPFJob provides the following methods to explore and obtain the execution results of its tasks:
public class JPPFJob extends AbstractJPPFJob
implements Iterable>, Future> {
// Get the count of tasks in this job that have completed
public int executedTaskCount()
// Get the count of tasks in this job that haven't yet been executed
public int unexecutedTaskCount()
// Wait until all execution results of the tasks in this job have been collected
public List> awaitResults()
// Wait until all execution results of the tasks in this job have been collected
// or the timeout expires, whichever happens first
public List> awaitResults(final long timeout)
// Get the list of currently available task execution results
public List> getAllResults()
// Get the execution status of this job
public SubmissionSatus getStatus()
// determine whether this job was cancelled
public boolean isCancelled()
// determine whether this job has completed normally or was cancelled
public boolean isDone()
// wait until the job is done
public List> get() throws InterruptedException, ExecutionException
// wait until the job is done or the timeout expires, whichever happens first
public List> get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
}
Note that the awaitResults() methods will block until the job has completed, or the timeout expires if any is specified.
If the timeout expires, an incomplete list of results will be returned. By contrast, getAllResults() will return
immediately with a partial list of task execution results, possibly empty if no result was received yet.
The getStatus() method returns an indication of the job's completion status, as one of the values defined in the
JobStatus enum:
public enum JobStatus {
// The job was just submitted
SUBMITTED,
// The job is currently in the submission queue (on the client side)
PENDING,
// The job is being executed
EXECUTING,
// The job execution is complete
COMPLETE,
// The job execution has failed
FAILED
}
A notable difference between the awaitResults(long) and get(long, TimeUnit) methods is that the get(...)
method will throw a TimeoutException whenever the specified timeout expires before the job completes. Other than
that,, awaitResults(timeout) is equivalent to get(timeout, TimeUnit.MILLISECONDS).
4.2.7 Cancelling a job
Cancelling a job can be performed with the cancel() and cancel(boolean mayInterruptIfRunning) methods of
JPPFJob. The mayInterruptIfRunning flag specifies whether the job can be cancelled while it is being executed: if
the flag is true and the job is executing, then it will not be cancelled and the cancel(...) method will return false.
Note that the return value of isCancelled() will reflect the cancelled state of the job, but only if it was cancelled within
the scope of the JPPF client application: with the JPPFClient API, or JPPFJob.cancel(boolean), or as the result of
setting an expiration schedule in the job's client SLA.
4.3 Jobs runtime behavior, recovery and failover
4.3.1 Failover and job re-submission
When the connection with the JPPF server is broken, the client application becomes unable to receive any more results
for the jobs it has submitted and which are still executing. When this happens, the default behavior for the JPPF client is
to resubmit the job, so that it will either be sent to another available server, or wait in the client's queue until the
connection is re-established.
There can be some side effects to this behavior, which should be carefully accounted for when designing your tasks. In
effect, the fact that a task result was not received by the client doesn't necessarily mean the task was not executed on a
node. This implies that a task may be executed more than once on the grid, as the client has no way of knowing this. In
particular, if the task performs persistent operations, such as updating a database or writing to a file system, this may lead
to unexpected results whenever the task is executed again.
4.3.2 Job persistence and recovery
The entire state of a job can be persisted by associating a persistence manager to the job. A persistence manager is an
implementation of the JobPersistence interface, defined as follows:
package org.jppf.client.persistence;
public interface JobPersistence {
// Compute the key for the specified job. All calls to this method
// with the same job instance should always return the same result.
K computeKey(JPPFJob job);
// Get the keys of all jobs in the persistence store
Collection allKeys() throws JobPersistenceException;
// Load a job from the persistence store given its key
JPPFJob loadJob(K key) throws JobPersistenceException;
// Store the specified tasks of the specified job with the specified key
// The list of tasks may be used to only store the delta for better performance
void storeJob(K key, JPPFJob job, List> tasks)
throws JobPersistenceException;
// Delete the job with the specified key from the persistence store
void deleteJob(K key) throws JobPersistenceException;
// Close this store and release any used resources
void close();
}
As we can see, the persistence manager relies on keys that will allow it to uniquely identify jobs in the persistent store.
The type of store is implementation-dependent, and can be any storage device or facility, for example a file system, a
database, a cloud storage facility, a distributed cache, etc...
The JPPFJob class provides the following getter and setter for the persistence manager:
public class JPPFJob implements Serializable, JPPFDistributedJob {
// Get the persistence manager
public JobPersistence getPersistenceManager()
}
// Set the persistence manager
public void setPersistenceManager(final JobPersistence persistenceManager)
JPPF provides a ready-to-use implementation of JobPersistence: the class DefaultFilePersistenceManager. This
implementation stores the jobs on the file system. Each job, with its attributes and tasks, is saved in a single file, using
Java serialization. The key associated with each job is the job's uuid (see JPPFJob.getUuid() method). It can be
instantiated using one of the following constructors:
public class DefaultFilePersistenceManager implements JobPersistence {
// Initialize with the specified root path, using default file prefix and extension
public DefaultFilePersistenceManager(File root)
// Initialize with the specified root path, file prefix and extension
public DefaultFilePersistenceManager(File root, String prefix, String ext)
// Initialize with the specified root path, using default file prefix and extension
public DefaultFilePersistenceManager(String root)
}
// Initialize with the specified root path, file prefix and extension
public DefaultFilePersistenceManager(String root, String prefix, String ext)
Note, that DefaultFilePersistenceManager will use the serializations scheme configured for the client.
Finally, this persistence manager is shown in action in the Job Recovery related sample.
4.3.3 Job lifecycle notifications: JobListener
It is possible to receive notifications for when a job is being started (i.e. sent to the server), when its execution is
completed (results have been received for all tasks), when a subset of its tasks is dispatched for execution and when a
subset of its tasks has returned from execution. This is done by registering instances of the JobListener interface with
the job, defined as follows:
// Listener interface for receiving job execution event notifications
public interface JobListener extends EventListener {
// Called when a job is sent to the server, or its execution starts locally
void jobStarted(JobEvent event);
// Called when the execution of a job is complete
void jobEnded(JobEvent event);
// Called when a job, or a subset of its tasks, is sent to the server,
// or to the local executor
void jobDispatched(JobEvent event);
// Called when the execution of a subset of a job is complete
void jobReturned(JobEvent event);
}
Please note that jobDispatched() and jobReturned() may be called in parallel by multiple threads, in the case
where the JPPF client has multiple connections in its configuration. This happens if the client uses multiple connections to
the same server, connections to multiple servers, or a mix of connections to remote servers and a local executor. You will
need to synchronize any operations that is not thread-safe within these methods.
In a normal execution cycle, jobStarted() and jobEnded() will be called only once for each job, whereas
jobDispatched() and jobReturned() may be called multiple times, depending on the number of available
connections, the load-balancing configuration on the client side, and the job's client-side SLA.
Additionally, the built-in job failover mechanism may cause the jobStarted() and jobEnded() callbacks to be invoked
multiple times, for instance in the case where the connection to the server is lost, causing the job to be re-submitted.
Note: it is recommended to only change the job SLA or metadata during the jobStarted() notification. Making
changes in the other notifications will lead to unpredictable results and may cause the job to fail.
The notifications are sent as instances of JobEvent, which is defined as:
// Event emitted by a job when its execution starts or completes
public class JobEvent extends EventObject {
// Get the job source of this event
public JPPFJob getJob()
// Get the tasks that were dispatched or returned
public List> getJobTasks()
// Whether the current job duisâtch is sent to a remote driver
public boolean isRemoteExecution()
// Get the the connection used to send the job dispatch to a remote driver
public JPPFClientConnection getConnection()
}
Note that the getTasks() method is only useful for jobDispatched() and jobReturned() notifications. In all other
cases, it will return null.. The same applies to the methods isRemoteExecution() and getConnection().
Furthermore, getConnection() will also return null if isRemoteExecution() returns false, that is, if the job
dispatch is executed in the client-local executor.
To add or remove listeners, use the related methods in JPPFJob:
public class JPPFJob implements Serializable, JPPFDistributedJob {
// Add a job listener
public void addJobListener(JobListener listener)
// Remove a job listener
public void removeJobListener(JobListener listener)
}
A possible use of these listeners is to “intercept” a job before it is sent to the server, and adjust some of its attributes, such
as the SLA specifications, which may vary depending on the time at which the job is started or on an applicationdependent context. It can also be used to collect the results of non-blocking jobs in a fully asynchronous way.
If you do not need to implement all the methods of JobListener, your implementation may instead extend the class
JobListenerAdapter, which provides an empty implementation of each method in the interface.
Multi-threaded usage note: if you intend to use the same JobListener instance from multiple threads, for instance with
multiple concurrent non-blocking jobs, you will need to explicitely synchronize the code of the listener.
Here is a simple example of a thread-safe JobListener implementation:
// counts the total submitted and executed tasks for all jobs
public class MyJobListener extends JobListenerAdapter {
private int totalSubmittedTasks = 0;
private int totalExecutedTasks = 0;
@Override
public synchronized void jobStarted(JobEvent event) {
JPPFJob job = event.getJob();
// add the number of tasks in the job
totalSubmittedTasks += job.getJobTasks().size();
System.out.println("job started: submitted = " + totalSubmittedTasks +
", executed = " + totalExecutedTasks);
}
}
@Override
public synchronized void jobReturned(JobEvent event) {
List> tasks = event.getJobTasks();
// add the number of task results received
totalExecutedTasks += tasks.size();
System.out.println("job returned: submitted = " + totalSubmittedTasks +
", executed = " + totalExecutedTasks);
}
4.4 Sharing data among tasks : the DataProvider API
After a job is submitted, the server will distribute the tasks in the job among the nodes of the JPPF grid. Generally, more
than one task may be sent to each node. Given the communication and serialization protocols implemented in JPPF,
objects referenced by multiple tasks at submission time will be deserialized as multiple distinct instances at the time of
execution in the node. This means that, if n tasks reference object A at submission time, the node will actually deserialize
multiple copies of A, with Task1 referencing A1, … , Taskn referencing An. We can see that, if the shared object is very
large, we will quickly face memory issues.
To resolve this problem, JPPF provides a mechanism called data provider that enables sharing common objects among
tasks in the same job. A data provider is an instance of a class that implements the interface DataProvider. Here is the
definition of this interface:
public interface DataProvider extends Metadata {
// @deprecated: use getParameter(Object) instead
T getValue(final Object key) throws Exception;
// @deprecated: use setParameter(Object, Object) instead
void setValue(Object key, Object value) throws Exception;
}
As we can see, the two methods in the interface are deprecated, but kept for preserving the compatibility with applications
written with a JPPF version prior to 4.0. The actual API is is defined in the Metadata interface as follows:
public interface Metadata extends Serializable {
// Retrieve a parameter in the metadata
T getParameter(Object key);
// Return a parameter in the metadata, or a default value if not found
T getParameter(Object key, T def);
// Set or replace a parameter in the metadata
void setParameter(Object key, Object value);
// Remove a parameter from the metadata
T removeParameter(Object key);
// Get the metadata map
Map getAll();
// Clear all the the metadata
void clear();
}
This is indeed a basic object map interface: you can store objects and associate them with a key, then retrieve these
objects using the associated key.
Here is an example of using a data provider in the application:
MyLargeObject myLargeObject = ...;
// create a data provider backed by a Hashtable
DataProvider dataProvider = new MemoryMapDataProvider();
// store the shared object in the data provider
dataProvider.setParameter("myKey", myLargeObject);
JPPFJob = new JPPFJob();
// associate the dataProvider with the job
job.setDataProvider(dataProvider);
job.add(new MyTask());
and in a task implementation:
public class MyTask extends AbstractTask {
public void run() {
// get a reference to the data provider
DataProvider dataProvider = getDataProvider();
// retrieve the shared data
MyLargeObject myLargeObject = dataProvider.getParameter("myKey");
// ... use the data ...
}
}
Note 1: the association of a data provider to each task is done automatically by JPPF and is totally transparent to the
application.
Note 2: from each task's perspective,the data provider should be considered read-only. Modifications to the data
provider such as adding or modifying values, will NOT be propagated beyond the scope of the node. Hence, a data
provider cannot be used as a common data store for the tasks. Its only goal is to avoid excessive memory consumption
and improve the performance of the job serialization.
4.4.1 MemoryMapDataProvider: map-based provider
MemoryMapDataProvider is a very simple implementation of the DataProvider interface. It is backed by a
java.util.Hashtable. It can be used safely from multiple concurrent threads.
4.4.2 Data provider for non-JPPF tasks
By default, tasks whose class does not implement Task do not have access to the DataProvider that is set on the a job.
This includes tasks that implement Runnable or Callable (including those submitted with a JPPFExecutorService),
annotated with @JPPFRunnable, and POJO tasks.
JPPF now provides a mechanism which enables non JPPF tasks to gain access to the DataProvider. To this effect, the
task must implement the interface DataProviderHolder, defined as follows:
package org.jppf.client.taskwrapper;
import org.jppf.task.storage.DataProvider;
// This interface must be implemented by tasks that are not subclasses
// of JPPFTask when they need access to the job's DataProvider
public interface DataProviderHolder {
// Set the data provider for the task
void setDataProvider(DataProvider dataProvider);
}
Here is an example implementation:
public class MyTask
implements Callable, Serializable, DataProviderHolder {
// DataProvider set onto this task
private transient DataProvider dataProvider;
@Override
public String call() throws Exception {
String result = (String) dataProvider.getValue("myKey");
System.out.println("got value " + result);
return result;
}
}
@Override
public void setDataProvider(final DataProvider dataProvider) {
this.dataProvider = dataProvider;
}
Note that the “dataProvider” attribute is set as transient, to prevent the DataProvider from being serialized along with
the task when it is sent back to the server after execution. Another way to achieve this would be to set it to null at the
end of the call() method, for instance in a try {} finally {} block.
4.5 Job Service Level Agreement
A job service level agreement (SLA) defines the terms and conditions in which a job will be processed. A job carries two
distinct SLAs, one which defines a contract between the job and the JPPF server, the other defining a different contract
between the job and the JPPF client.
Server and client SLAs have common attributes, which specify:
• the characteristics of the nodes it can run on (server side), or of the channels it can be sent through (client side): the
job execution policy
• the time at which a job is scheduled to start
• an expiration date for the job
The attributes specific to the server side SLA are:
•
•
•
•
•
the priority of a job
whether it is submitted in suspended state
the maximum number of nodes it can run on
whether the job is a standard or broadcast job
whether the server should immediately cancel the job, if the client that submitted it is disconnected
The attributes specific to the client side SLA are:
• the maximum number of channels it can be sent through
A job SLA is represented by the interface JobSLA for the server side SLA, and by the interface JobClientSLA for the
client side SLA. It can be accessed from a job using the related getters and setters:
public class JPPFJob extends AbstractJPPFJob
implements Iterable>, Future> {
// The job's server-side SLA
public JobSLA getSLA()
}
// The job's client-side SLA
public JobClientSLA getClientSLA()
Example usage:
JPPFJob myJob = new JPPFJob();
myJob.getClientSLA().setMaxChannels(2);
myJob.getSLA().setPriority(1000);
Also note that both interfaces extend the common interface JobCommonSLA. We will go into the details of these interfaces
in the following sections.
4.5.1 Attributes common to server and client side SLAs
As seen previously, the common attributes for server and client side SLAs are defined by the JobCommonSLA interface:
public class JobCommonSLA> extends Serializable {
// The execution policy
public ExecutionPolicy getExecutionPolicy();
public T setExecutionPolicy(ExecutionPolicy executionPolicy);
// The job start schedule
public JPPFSchedule getJobSchedule();
public T setJobSchedule(JPPFSchedule jobSchedule);
}
// The job expiration schedule
public JPPFSchedule getJobExpirationSchedule();
public T setJobExpirationSchedule(JPPFSchedule jobExpirationSchedule);
4.5.1.1 Execution policy
An execution policy is an object that determines whether a particular set of JPPF tasks can be executed on a JPPF node
(for the server-side SLA) or if it can be sent via a communication channel (for the client-side). It does so by applying the
set of rules (or tests) it is made of, against a set of properties associated with the node or channel.
For a fully detailed description of how to create and use execution policies, please read the Execution policies section of
this development guide.
Example usage:
// define a non-trivial server-side execution policy:
// execute on nodes that have at least 2 threads and whose IPv4 address
// is in the 192.168.1.nnn subnet
ExecutionPolicy serverPolicy = new AtLeast("processing.threads", 2).and(
new Contains("ipv4.addresses", true, "192.168.1."));
// define a client-side execution policy:
// submit to the client local executor or to drivers whose IPv4 address
// is in the 192.168.1.nnn subnet
ExecutionPolicy clientPolicy = new Equal("jppf.channel.local", true).or(
new Contains("ipv4.addresses", true, "192.168.1."));
JPPFJob job = new JPPFJob();
// set the server-side policy
JobSLA sla = job.getSLA().setExecutionPolicy(serverPolicy);
// set the client-side policy
JobClientSLA clientSla = job.getClientSLA().setExecutionPolicy(clientPolicy);
// print an XML representation of the server-side policy
System.out.println("server policy is:\n" + sla.getExecutionPolicy());
4.5.1.2 Job start and expiration scheduling
It is possible to schedule a job for a later start, and also to set a job for expiration at a specified date/time. The job SLA
allows this by providing the following methods:
// job start schedule
public JPPFSchedule getJobSchedule()
public T setJobSchedule(JPPFSchedule schedule)
// job expiration schedule
public JPPFSchedule getJobExpirationSchedule()
public T setJobExpirationSchedule(JPPFSchedule schedule)
As we can see, this is all about getting and setting an instance of JPPFSchedule. A schedule is normally defined through
one of its constructors:
As a fixed length of time
public JPPFSchedule(long duration)
The semantics is that the job will start duration milliseconds after the job is received by the server. Here is an example:
JPPFJob myJob = new Job();
// set the job to start 5 seconds after being received
JPPFSchedule mySchedule = new JPPFSchedule(5000L);
myJob.getSLA().setJobSchedule(mySchedule);
As a specific date/time
public JPPFSchedule(String date, String dateFormat)
Here, the date format is specified as a pattern for a SimpleDateFormat instance.
Here is an example use of this constructor:
JPPFJob myJob = new Job();
String dateFormat = "MM/dd/yyyy hh:mm a z";
// set the job to expire on September 30, 2010 at 12:08 PM in the CEDT time zone
JPPFSchedule schedule = new JPPFSchedule("09/30/2010 12:08 PM CEDT", dateFormat);
myJob.getSLA().setJobExpirationSchedule(schedule);
4.5.2 Server side SLA attributes
A server-side SLA is described by the JobSLA class, defined as:
public class JobSLA extends JobCommonSLA {
// Job priority
public int getPriority()
public JobSLA setPriority(int priority)
// Maximum number of nodes the job can run on
public int getMaxNodes();
public JobSLA setMaxNodes(int maxNodes);
// max number of groups of master/slaves nodes the job can run on at any given time
public int getMaxNodeProvisioningGroups()
public JobSLA setMaxNodeProvisioningGroups(int maxNodeProvisioningGroups)
// Whether the job is initially suspended
public boolean isSuspended()
public JobSLA setSuspended(boolean suspended)
// whether the job is a broadcast job
public boolean isBroadcastJob();
public JobSLA setBroadcastJob(boolean broadcastJob)
// whether the job should be canceled by the server when the client is disconnected
public boolean isCancelUponClientDisconnect()
public JobSLA setCancelUponClientDisconnect(boolean cancelUponClientDisconnect)
// expiration schedule for any subset of the job dispatched to a node
public JPPFSchedule getDispatchExpirationSchedule()
public JobSLA setDispatchExpirationSchedule(JPPFSchedule schedule)
// number of times a dispatched task can expire before it is finally cancelled
public int getMaxDispatchExpirations()
public JobSLA setMaxDispatchExpirations(int max)
// class path associated with the job
public ClassPath getClassPath()
public JobSLA setClassPath(ClassPath classpath)
// max
public
public
public
public
number of task resubmits and whether to apply it upon node error
int getMaxTaskResubmits()
JobSLA setMaxTaskResubmits(int maxResubmits)
boolean isApplyMaxResubmitsUponNodeError()
JobSLA setApplyMaxResubmitsUponNodeError(boolean applyMaxResubmitsUponNodeError)
// global grid execution policy
public ExecutionPolicy getGridExecutionPolicy()
public JobSLA setGridExecutionPolicy(ExecutionPolicy policy)
}
// desired node configuration
public JPPFNodeConfigSpec getDesiredNodeConfiguration()
public JobSLA setDesiredNodeConfiguration(JPPFNodeConfigSpec nodeConfigurationSpec)
4.5.2.1 Job priority
The priority of a job determines the order in which the job will be executed by the server. It can be any integer value, such
that if jobA.getPriority() > jobB.getPriority() then jobA will be executed before jobB. There are situations where both jobs
may be executed at the same time, for instance if there remain any available nodes for jobB after jobA has been
dispatched. Two jobs with the same priority will have an equal share (as much as is possible) of the available grid nodes.
The priority attribute is also manageable, which means that it can be dynamically updated, while the job is still executing,
using the JPPF administration console or the related management APIs. The default priority is zero.
Example usage:
JPPFJob job1 = new JPPFJob();
job1.getSLA().setPriority(10); // create the job with a non-default priority
JPPFJob job2 = new JPPFJob();
job2.getSLA().setPriority(job1.getSLA().getPriority() + 1); // slightly higher priority
4.5.2.2 Maximum number of nodes
The maximum number of nodes attribute determines how many grid nodes a job can run on, at any given time. This is an
upper bound limit, and does not guarantee that always this number of nodes will be used, only that no more than this
number of nodes will be assigned to the job. This attribute is also non-distinctive, in that it does not specify which nodes
the job will run on. The default value of this attribute is equal to Integer.MAX_VALUE, i.e. 231-1
The resulting assignment of nodes to the job is influenced by other attributes, especially the job priority and an eventual
execution policy. The maximum number of nodes is also a manageable attribute, which means it can be dynamically
updated, while the job is still executing, using the JPPF administration console or the related management APIs.
Example usage:
JPPFJob job = new JPPFJob();
// this job will execute on a maximum of 10 nodes
job.getSLA().setMaxNodes(10);
4.5.2.3 Maximum number of node provisioning groups
A node provisioning group designates a set of nodes made of one master node and its provisioned slave nodes, if it has
any. The SLA allows restricting a job execution to a maximum number of node provisioning groups. This SLA attribute is
useful whenever you want to take advantage of the fact that, by definition, a master and its slave nodes all run on the
same machine, for instance to exploit data locality properties. This attribute's default value of is Integer.MAX_VALUE (2311).
Note that this attribute does not specifically restrict the total number of nodes the job can run on, since each master node
can have any number of slaves. For this, you also need to set the maximum number of nodes attribute. Additionally, this
attribute has no effect on the selection of nodes that are neither master nor slave, such as offline nodes.
Example usage:
JPPFJob job = new JPPFJob();
// only execute on a single group of master/slaves at a time
job.getSLA().setMaxNodeProvisioningGroups(1);
// further restrict to only the slave nodes in the provisioning group
job.getSLA().setExecutionPolicy(new Equal("jppf.node.provisioning.slave", true));
4.5.2.4 Initial suspended state
A job can be initially suspended. In this case, it will remain in the server's queue until it is explicitly resumed or canceled,
or if it expires (if a timeout was set), whichever happens first. A job can be resumed and suspended again any number of
times via the JPPF administration console or the related management APIs.
Example usage:
JPPFJob job = new JPPFJob();
// this job will be submitted to the server and will remain suspended until
// it is resumed or cancelled via the admin console or management APIs
job.getSLA().setSuspended(true);
4.5.2.5 Broadcast jobs
A broadcast job is a specific type of job, for which each task will be be executed on all the nodes currently present in the
grid. This opens new possibilities for grid applications, such as performing maintenance operations on the nodes or
drastically reducing the size of a job that performs identical tasks on each node.
With regards to the job SLA, a job is set in broadcast mode via a boolean indicator, for which the interface JobSLA
provides the following accessors:
public boolean isBroadcastJob()
public JobSLA setBroadcastJob(boolean broadcastJob)
To set a job in broadcast mode:
JPPFJob myJob = new JPPFJob();
myJob.getSLA().setBroadcastJob(true);
With respect to the dynamic aspect of a JPPF grid, the following behavior is enforced:
• a broadcast job is executed on all the nodes connected to the driver, at the time the job is received by the JPPF
driver. This includes nodes that are executing another job at that time
• if a node dies or disconnects while the job is executing on it, the job is canceled for this node
• if a new node connects while the job is executing, the broadcast job will not execute on it
• a broadcast job does not return any results, i.e. it returns the tasks in the same state as they were submitted
Additionally, if local execution of jobs is enabled for the JPPF client, a broadcast job will not be executed locally. In other
words, a broadcast job is only executed on remote nodes.
4.5.2.6 Canceling a job upon client disconnection
By default, if the JPPF client is disconnected from the server while a job is executing, the server will automatically attempt
to cancel the job on all nodes it was dispatched to, and remove the job from the server queue. This behavior can be
disabled on a per-job basis, for example if you want the job to complete but do not need the execution results.
Example usage:
JPPFJob myJob = new JPPFJob();
myJob.getSLA().setCancelUponClientDisconnect(false);
4.5.2.7 Expiration of job dispatches
Definition: a job dispatch is the whole or part of a job that is dispatched by the server to a node.
The server-side job SLA enables specifying whether a job dispatch will expire, along with the behavior upon exipration.
This is done with a combination of two attributes: a dispatch expiration schedule, which specifies when the dispatch will
expire, and a maximum number of expirations after which the tasks in the dispatch will be cancelled instead of
resubmitted. By default, a job dispatch will not expire and the number of expirations is set to zero (tasks are cancelled
upon the first expiration, if any).
One possible use for this mechanism is to prevent resource-intensive tasks from bloating slow nodes, without having to
cancel the whole job or set timeouts on inidividual tasks.
Example usage:
JPPFJob job = new JPPFJob();
// job dispatches will expire if they execute for more than 5 seconds
job.getSLA().setDispatchExpirationSchedule(new JPPFSchedule(5000L));
// dispatched tasks will be resubmitted at most 2 times before they are cancelled
job.getSLA().setMaxDispatchExpirations(2);
4.5.2.8 Setting a class path onto the job
The classpath attribute of the job SLA allows sending library files along with the job and its tasks. Out of the box, this
attribute is only used by offline nodes, to work around the fact that offline nodes do no have remote class loading
capabilities. The class path attribute, by default empty but not not null, is accessed with the following methods:
public class JobSLA extends JobCommonSLA {
// get / set the class path associated with the job
public ClassPath getClassPath()
public JobSLA setClassPath(ClassPath classpath)
}
We can see that a class path is represented by the ClassPath interface, defined as follows:
public interface ClassPath extends Serializable, Iterable {
// add an element to this classpath
ClassPath add(ClassPathElement element);
ClassPath add(String name, Location> location);
ClassPath add(String name, Location> localLocation, Location> remoteLocation);
// remove
ClassPath
ClassPath
// get an
an element from this classpath
remove(ClassPathElement element);
remove(String name);
element with the specified name
}
ClassPathElement element(String name);
// get all the elements in this classpath
Collection allElements();
// empty this classpath (remove all elements)
ClassPath clear();
// is this classpath empty?
boolean isEmpty();
// should the node force a reset of the class loader before executing the tasks?
boolean isForceClassLoaderReset();
void setForceClassLoaderReset(boolean forceReset);
Note that one of the add(...) methods uses a ClassPathElement as parameter, while the others use a name with one
or two Location objects (see the Location API section). These methods are equivalent. For the last two, JPPF will
internally create instances of a default implementation of ClassPathElement (class ClassPathElementImpl). It is preferred
to avoid creating ClassPathElement instances, as it makes the code less cumbersome and independent from any
specific implementation.
Also note that ClassPath implements Iterable, so that it can be used in for loops:
for (ClassPathElement elt: myJob.getSLA().getClassPath()) ...;
The ClassPathElement interface is defined as follows:
public interface ClassPathElement extends Serializable {
// get the name of this classpath element
String getName();
// get the local (to the client) location of this element
Location> getLocalLocation();
// get the remote (local to the node) location of this element, if any
Location> getRemoteLocation();
// perform a validation of this classpath element
boolean validate();
}
JPPF provides a default implementation ClassPathElementImpl which does not perform any validation, that is, its
validate() method always returns true.
Finally, here is an example of how this can all be put together:
JPPFJob myJob = new JPPFJob();
ClassPath classpath = myJob.getSLA().getClassPath();
// wrap a jar file into a FileLocation object
Location jarLocation = new FileLocation(“libs/MyLib.jar”);
// copy the jar file into memory
Location location = jarLocation.copyTo(new MemoryLocation(jarLocation.size());
// or another way to do this:
location = new MemoryLocation(jarLocation.toByteArray());
// add it as classpath element
classpath.add(“myLib”, location);
// the following is functionally equivalent:
classpath.add(new ClassPathElementImpl(“myLib”, location));
// tell the node to reset the tasks classloader with this new class path
classpath.setForceClassLoaderReset(true);
4.5.2.9 Maximum number of tasks resubmits
As we have seen in the “resubmitting a task” section, tasks have the ability to schedule themselves for resubmission by
the server. The job server-side SLA allows you to set the maximum number of times this can occur, with the following
accessors:
public class JobSLA extends JobCommonSLA {
// get the naximum number of times a task can resubmit itself
// via AbstractTask.setResubmit(boolean)
public int getMaxTaskResubmits()
// set the naximum number of times a task can resubmit itself
public JobSLA setMaxTaskResubmits(int maxResubmits)
// Determine whether the max resubmits limit for tasks is also applied
// when tasks are resubmitted due to a node error
public boolean isApplyMaxResubmitsUponNodeError()
// Specify whether the max resubmits limit for tasks should also be applied
// when tasks are resubmitted due to a node error
public JobSLA setApplyMaxResubmitsUponNodeError(boolean applyMaxResubmitsUponNodeError)
}
The default value for the maxTaskResubmits attribute is 1, which means that by default a task can resubmit itself at
most once. Additionally, this attribute can be overriden by setting the maxResubmits attribute of individual tasks.
The applyMaxResubmitsUponNodeError flag is set to false by default. This means that, when the tasks are
resubmitted due to a node connection error, the resubmit will not count with regards to the limit. To change this behavior,
setApplyMaxResubmitsUponNodeError(true) must be called explicitely.
Example usage:
public class MyTask extends AbstractTask {
@Override public void run() {
// unconditional resubmit could lead to an infinite loop
setResubmit(true);
// the result will only be kept after the max number of resubmits is reached
setResult("success");
}
}
JPPFJob job = new JPPFJob();
job.add(new MyTask());
// tasks can be resubmitted 4 times, meaning they can execute up to 5 times total
job.getSLA().setMaxTaskResubmits(4);
// resubmits due to node errors are also counted
job.getSLA().setApplyMaxResubmitsUponNodeError(true);
// ... submit the job and get the results ...
4.5.2.10 Disabling remote class loading during job execution
Jobs can specify whether remote class loader lookups are enabled during their execution in a remote node. When remote
class loading is disabled, lookups are only performed in the local classpath of each class loader in the class loader
hierarchy, and no remote resource requests are sent to the server or client. This is done with the following accessors:
public class JobSLA extends JobCommonSLA {
// Determine whether remote class loading is enabled for the job. Default to true
public boolean isRemoteClassLoadingEnabled()
// Specify whether remote class loading is enabled for the job
public JobSLA setRemoteClassLoadingEnabled(boolean enabled)
}
Note 1: when remote class loading is disabled, the classes that the JPPF node normally loads from the server cannot be
loaded remotely either. It is thus required to have these classes in the node's local classpath, which is usally done by
adding the "jppf-server.jar" and "jppf-common.jar" files to the node's classpath.
Note 2: if a class is not found while remote class loading is disabled, it will remain not found, even if the next job specifies
that remote class loading is enabled. This is due to the fact that the JPPF class loaders maintain a cache of classes not
found to avoid unnecessary remote lookups. To avoid this behavior, the task class loader should be reset before the next
job is executed.
Example usage:
JPPFJob job =new JPPFJob();
// disable remote class loading at exeution time
job.getSLA().setRemoteClassLoadingEnabled(false);
4.5.2.11 Grid policy
Jobs can also specify an execution policy that will be evaluated against the server and the totality of its nodes, instead of
just against individual nodes as for the SLA's execution policy attribute we saw earlier in this documentation.
This grid policy is defined as a normal execution policy with two differences:
• it is evaluated against the properties of the server
• it may include any number of server global policies that count the nodes matching a given node policy
This policy is accessible with the following setter and getter of the SLA:
public class JobSLA extends JobCommonSLA {
// Get the global grid execution policy
public ExecutionPolicy getGridExecutionPolicy()
// Set the global grid execution policy
public JobSLA setGridExecutionPolicy(ExecutionPolicy policy)
}
For example, to express and set the policy "execute the job when the server has at least 2 GB of available heap memory
and at least 3 nodes with more than 4 processing threads each", we would code something like this:
int GB = 1024*1024*1024; // 1 GB
JPPFJob job = new JPPFJob();
// evaluated against each node's properties
ExecutionPolicy nodePolicy = new MoreThan("jppf.processing.threads", 4);
// evaluated against the server's properties
ExecutionPolicy gridPolicy = new AtLeast("availableMemory", 2*GB)
.and(new NodesMatching(Operator.MORE_THAN, 3, nodePolicy));
// set the grid policy onto the SLA
job.getSLA().setGridExecutionPolicy(gridPolicy);
4.5.2.12 Specifying the desired node configuration
It is possible for a job to specify the configuration of the nodes it needs to run on and force eligible nodes to update their
configuration accordingly and restart for the configuration changes to take place. The specified configuration includes all
existing JPPF properties, in particular "jppf.java.path" and "jppf.jvm.options", which allow specifiying the JVM
and its options for running the node after restart. It also includes any custom, application-defined property than can be
expressed in a configuration file.
This is done with the following JobSLA methods:
public class JobSLA extends JobCommonSLA {
// Get the configuration of the node(s) this job should be executed on
public JPPFNodeConfigSpec getDesiredNodeConfiguration()
// Set the configuration of the node(s) this job should be executed on
public JobSLA setDesiredNodeConfiguration(JPPFNodeConfigSpec nodeConfigurationSpec)
}
The desired node configuration is specified as a JPPFNodeConfigSpec object, defined as follows:
public class JPPFNodeConfigSpec implements Serializable {
// Initialize this object with a desired configuration and a restart flag set to true
public JPPFNodeConfigSpec(TypedProperties desiredConfiguration)
throws IllegalArgumentException
// Initialize this object with a desired configuration and restart flag
public JPPFNodeConfigSpec(TypedProperties desiredConfiguration, boolean forceRestart)
throws IllegalArgumentException
// Get the desired JPPF configuration of each node
public TypedProperties getConfiguration()
}
// Determine whether to force the restart of a node after reconfiguring it
public boolean isForceRestart()
The configuration attribute specifies the properties that will be overriden or added to the node configuration. In terms
of node selection, the JPPF server will prioritize the nodes whose configuration most closely matches the desired one, by
computing a similarity score which relies on the distances between the string values of the desired and actual properties.
Only the properties specified in the configuration attribute are compared.
The forceRestart flag determines whether a node should be restarted when it matches exactly the desired
configuration. If set to true, the nodes will always be restarted. Otherwise, nodes that exactly match the desired
configuration will not be restarted.
It is important to note that this SLA attribute is evaluated in combination with the other attrbiutes of the job SLA. In
particular, it should not be confused with the execution policy, which is used to first filter eligible nodes, whereas the
desired node configuration is applied to eligble nodes and triggers a configuration change and restart in those nodes.
There are restrictions as to the kind of nodes that can be affected by this SLA attribute: since a configuration change and
restart of the node is triggered, this can only be done with manageable nodes, which excludes offline nodes and Android
nodes. Furthermore, it does not apply to server-local nodes, since the node restart would also cause the server to be
restarted.
Lastly, it is strongly advised to use this SLA attribute in combination with the maximum number of nodes and a job
expiration: since the reconfiguration and restart is very disruptive for the nodes, it has a non-trivial impact on performance,
so you might want to limit the number of nodes that are restarted. Also, between the request for the node reconfiguration
and the time the node becomes available after restart, the server reserves the node for the specific job involved. Setting
an expiration timeout on the job ensures that the node can be reused for other jobs, should anything wrong happen. In
effect, the server will remove all reservations for this job whenever it is cancelled or expires.
Example usage:
JPPFJob job = new JPPFJob();
// define the desired node configuration properties
TypedProperties props = new TypedProperties()
.set(JPPFProperties.JVM_OPTIONS, "-server -Xmx1g")
.setInt("property.1", 123456)
.setString("property.2", "abcdef");
// create the node config spec with restart only when the properties don't match
JPPFNodeConfigSpec desiredConfig = new JPPFNodeConfigSpec(props, false);
// set the corresponding SLA attribute
job.getSLA().setDesiredNodeConfiguration(desiredConfig);
// limit to 2 nodes max
job.getSLA().setMaxNodes(2);
// ensure the job expires after 10 minutes max
job.getSLA().setJobExpirationSchedule(new JPPFSchedule(10L*60L*1000L));
4.5.2.13 Specifying the job persistence
Job persistence in the driver is specified via the persistenceSpec attribute of the SLA:
public class JobSLA extends JobCommonSLA {
// Get the specification of the job persistence in the driver
public PersistenceSpec getPersistenceSpec()
}
This attribute is an instance of the class PersistenceSpec, defined as follows:
public class PersistenceSpec implements Serializable {
// Determine whether the job is persisted in the driver. Defaults to false
public boolean isPersistent()
// Specify whether the job is persisted in the driver
public PersistenceSpec setPersistent(boolean persistent)
// Whether the driver should automatically execute the persisted job upon restart.
// Defaults to false
public boolean isAutoExecuteOnRestart()
// Specify whether the driver should automatically execute the job after a restart
public PersistenceSpec setAutoExecuteOnRestart(boolean autoExecuteOnRestart)
// Whether the job should be deleted from the store upon completion. Defaults to true
public boolean isDeleteOnCompletion()
// Determine whether the job should be deleted from the store upon completion
public PersistenceSpec setDeleteOnCompletion(boolean deleteOnCompletion)
}
Instances of this class manage three boolean flags:
– the "persistent" flag determines whether the job is persisted at all. By default, it is set to false.
– the "delete on completion" flag determines whether the job should be removed from the store when it completes. This
flag is set to true by default.
– the "auto execute on restart" flag tells a driver that, upon restart, it should automatically resubmit the job's unexecuted
tasks until the job completes. This flag is set to false by default.
The following example shows how we would configure a persistent job that should be automatically executed upon driver
restart and deleted from the store upon completion:
JPPFJob job = new JPPFJob();
job.getSLA().getPersistenceSpec()
.setPersistent(true)
.setAutoExecuteOnRestart(true)
.setDeleteOnCompletion(true);
4.5.3 Client side SLA attributes
A client-side SLA is described by the interface JobClientSLA, defined as:
public class JobClientSLA extends JobCommonSLA {
// The maximum number of channels the job can be sent through,
// including the local executor if any is configured
public int getMaxChannels()
public JobClientSLA setMaxChannels(int maxChannels)
}
Note: since JPPF clients do not have a management interface, none of the client-side SLA attributes are manageable.
4.5.3.1 Maximum number of execution channels
The maximum number of channels attribute determines how many server connections a job can be sent through, at any
given time. This is an upper bound limit, and does not guarantee that this number of channels will always be used. This
attribute is also non-specific, since it does not specify which channels will be used. The default value of this attribute is 1.
Using more than one channel for a job enables faster I/O between the client and the server, since the job can be split in
multiple chunks and sent to the server via multiple channels in parallel.
Note 1: when the JPPF client is configured with a single server connection, this attribute has no effect.
Note 2: when local execution is enabled in the JPPF client, the local executor counts as one (additional) channel.
Note 3: the resulting assignment of channels to the job is influenced by other attributes, especially the execution policy.
Example usage:
JPPFJob job = new JPPFJob();
// use 2 channels to send the job and receive the results
job.getClientSLA().setMaxChannels(2);
4.6 Job Metadata
It is possible to attach user-defined metadata to a job, to describe the characteristics of the job and its tasks. This
additional data can then be reused by customized load-balancing algorithms, to perform load balancing based on
knowledge about the jobs. For instance, the metadata could provide information about the memory footprint of the tasks
and about their duration, which can be critical data for the server, in order to determine on which nodes the job or tasks
should be executed.
The job metadata is encapsulated in a specific interface: JobMetadata, and can be accessed from the job as follows:
JPPFJob job = ...;
JobMetadata metaData = job.getMetadata();
JobMetadata is defined as follows:
public interface JobMetadata extends Metadata {
}
As for the data provider, the API is actually defined by the Metadata interface:
public interface Metadata extends Serializable {
// Set a parameter in the metadata
Metadata setParameter(Object key, T value)
// Retrieve a parameter in the metadata
T getParameter(Object key)
// Retrieve a parameter in the metadata
T getParameter(Object key, T defaultValue)
// Remove a parameter from the metadata
T removeParameter(Object key)
// Get a the metadata map
public Map getAll()
// Clear all the the metadata
void clear();
}
Here is an example use:
JPPFJob job = ...;
JobMetadata metaData = job.getMetadata();
metaData
// set the memory footprint of each task to 10 KB
.setParameter(“task.footprint”, “” + (10 * 1024))
// set the duration of each task to 80 milliseconds
.setParameter(“task.duration”, “80”);
Related sample: “CustomLoadBalancer” in the JPPF samples pack.
4.7 Execution policies
An execution policy is an object that determines whether a particular set of JPPF tasks can be executed on a JPPF node
(for the server-side SLA) or if it can be sent via a communication channel (for the client-side). It does so by applying the
set of rules (or tests) it is made of, against a set of properties associated with the node or channel.
There are other uses for execution policies in JPPF, in particular for node selection in some of the management APIs.
The available properties include:
– JPPF configuration properties
– System properties (including -D*=* properties specified on the JVM command line)
– Environment variables (e.g. PATH, JAVA_HOME, etc.)
– Networking: list of ipv4 and ipv6 addresses with corresponding host name when it can be resolved
– Runtime information such as maximum heap memory, number of available processors, etc...
– Disk space and storage information
– A special boolean property named “jppf.channel.local” which indicates whether a node (server-side) or communication
channel (client-side) is local to the JPPF server or client, respectively
– Operating system information, such as OS name, version and architecture, physical RAM, swap space, CPU load
The kind of tests that can be performed apply to the value of a property, and include:
– Binary comparison operators: ==, <, <=, >, >= ; for instance: ''property_value <= 15''
– Range operators (intervals): ''property_value in'' [a,b] , [a,b[ , ]a,b] , ]a,b[
– "One of" operator (discrete sets): ''property_value in { a1, ... , aN }''
– "Contains string" operator: ''property_value contains "substring"''
– Regular expressions: '' property_value matches 'regexp' ''
– Expressions or scripts written in a script language such as Groovy or JavaScript
– Custom, user-defined tests
The tests can also be combined into complex expressions using the boolean operators NOT, AND, OR and XOR.
Using this mechanism, it is possible to write execution policies such as:
"Execute on a node only if the node has at least 256 MB of memory and at least 2 CPUs available"
“Execute the job only in the client's local executor”
In the context of a server-side SLA, an execution policy is sent along with the tasks to the JPPF driver, and evaluated by
the driver. It does not need to be sent to the nodes.
For a detailed and complete description of all policy elements, operators and available properties, please refer to the
chapter Appendix B: Execution policy reference.
4.7.1 Creating and using an execution policy
An execution policy is an object whose type is a subclass of ExecutionPolicy. It can be built in 2 ways:
By API, using the classes in the org.jppf.node.policy package.
Example:
// define a policy allowing only nodes with 2 processing threads or more
ExecutionPolicy atLeast2ThreadsPolicy = new AtLeast("jppf.processing.threads", 2);
// define a policy allowing only nodes that are part of the "mydomain.com"
// internet domain (case ignored)
ExecutionPolicy myDomainPolicy = new Contains("ipv4.addresses", true, "mydomain.com");
// define a policy that requires both of the above to be satisfied
ExecutionPolicy myPolicy = atLeast2ThreadsPolicy.and(myDomainPolicy);
Alternatively, this could be written in a single statement:
// define the same policy in one statement
ExecutionPolicy myPolicy = new AtLeast("jppf.processing.threads", 2).and(
new Contains("ipv4.addresses", true, "mydomain.com"));
Using an XML policy document:
Example XML policy:
jppf.processing.threads
2
ipv4.addresses
mydomain.com
As you can see, this is the exact equivalent of the policy we constructed programmatically before.
To transform this XML policy into an ExecutionPolicy object, we will have to parse it using the PolicyParser API,
by the means of one of the following methods:
static
static
static
static
static
ExecutionPolicy
ExecutionPolicy
ExecutionPolicy
ExecutionPolicy
ExecutionPolicy
parsePolicy(String)
parsePolicyFile(String)
parsePolicy(File)
parsePolicy(Reader)
parsePolicy(InputStream)
//
//
//
//
//
parse
parse
parse
parse
parse
from
from
from
from
from
a string
a file
a file
a Reader
an InputStream
Example use:
// parse the specified XML file into an ExecutionPolicy object
ExecutionPolicy myPolicy = PolicyParser.parsePolicyFile("../policies/MyPolicy.xml");
It is also possible to validate an XML execution policy against the JPPF Execution Policy schema using one of the
validatePolicy() methods of PolicyParser:
static
static
static
static
static
ExecutionPolicy
ExecutionPolicy
ExecutionPolicy
ExecutionPolicy
ExecutionPolicy
validatePolicy(String)
validatePolicyFile(String)
validatePolicy(File)
validatePolicy(Reader)
validatePolicy(InputStream)
//
//
//
//
//
validate
validate
validate
validate
validate
from
from
from
from
from
a string
a file
a file
a Reader
an InputStream
To enable validation, the document's namespace must be specified in the root element:
...
Example use:
public ExecutionPolicy createPolicy(String policyPath) {
try {
// validate the specified XML file
PolicyParser.validatePolicyFile(policyPath);
} catch(Exception e) {
// the validation and parsing errors are in the exception message
System.err.println("The execution policy " + policyPath +
" is not valid: " + e.getMessage());
return null;
}
// the policy is valid, we can parse it safely
return PolicyParser.parsePolicyFile(policyPath);
}
4.7.2 Scripted policies
As we have seen earlier, execution policies are objects whose class extends ExecutionPolicy. The evaluation of an
execution policy is performed by calling its accepts() method, which returns either true or false. A script policy is a
special type of policy which can execute a script written in a script language. The result of the evaluation of this script,
which must be a boolean, will be the value returned by its accept() method.
By default, JPPF provides engines for Groovy and JavaScript with the Rhino engine, however additional script languages
can be added via the service provider interface (SPI).
4.7.2.1 Creating scripted policies
At runtime, a scripted policy is an instance of ScriptedPolicy, which defines the following constructors:
public class ScriptedPolicy extends ExecutionPolicy {
// create with a script read from a string
public ScriptedPolicy(String language, String script)
// create with a script read from a reader
public ScriptedPolicy(String language, Reader scriptReader) throws IOException
// create with a script read from a file
public ScriptedPolicy(String language, File scriptFile) throws IOException
}
The equivalent XML is as follows:
As for any other execution policy predicate, scripted policies can be combined with other predicates, using the logical
operators AND, OR, XOR and NOT, for instance:
// Java
ExecutionPolicy policy = new AtLeast("processing.threads", 2).and(
new ScriptedPolicy("groovy", "return true"));
processing.threads
2
The script must either be an expression which resolves to a boolean value, or return a boolean value. For instance, the
Groovy statement “return true” and the Groovy expression “true” will work seamlessly.
4.7.2.2 Predefined variable bindings
6 pre-defined variables are made available to the scripts:
•
•
•
•
•
•
jppfSystemInfo: the parameter passed to the policy's accepts() method, its type is JPPFSystemInformation
jppfSla: the server-side SLA of the job being matched, if available, of type JobSLA
jppfClientSla: the client-side SLA of the job being matched, if available, of type JobClientSLA
jppfMetadata: the metadata of the job being matched, if available, of type JobMetadata
jppfDispatches: the number of nodes the job is already disatched to, of type int
jppfStats: the server statistics, of type JPPFStatistics
For example, let's look at the following JavaScript script, which determines the node participation based on a jobs priority:
if the priority is 1 or less, the job can use no more than 10% of the total number of nodes, if the job priority is 2 then it can
use no more than 20%, … up to 90% if the priority is 9 or more:
function accepts() {
}
// total nodes in the grid from the server statistics
var totalNodes = jppfStats.getSnapshot("nodes").getLatest();
// the job priority
var prio = jppfSla.getPriority();
// determine max allowed nodes for the job, as % of total nodes
var maxPct = (prio <= 1) ? 0.1 : (prio >= 9 ? 0.9 : prio / 10.0);
// return true if current nodes for the job is less than max %
return jppfDispatches < totalNodes * maxPct;
// returns a boolean value
accepts();
Let's say this script is stored in a file located at ./policies/NodesFromPriority.js, we could then create an execution policy
out of it, with the following code:
ScriptedPolicy policy =
new ScriptedPolicy("javascript", new File("policies/NodesFromPriority.js"));
4.7.2.3 Adding available languages
The JPPF scripting APIs rely entirely on the JSR 223 specification, which is implemented in the JDK's javax.script
package. This means that JPPF will be able to use any script language made available to the JVM, including the default
JavaScript engine (i.e. Rhino in JDK 7 and Nashorn in JDK 8).
Thus to add a new language, all that is needed is to add the proper jar files, which declare a JSR-223 compliant script
engine via the documented SPI discovery mechanism. For example, you can add the Groovy language by simply adding
groovy-all-x.y.z.jar to the classpath, because it implements the JSR 223 specification (the jar file is located in the JPPF
source distribution at JPPF/lib/Groovy/groovy-all-1.6.5.jar).
4.7.3 Execution policy context
Each execution policy has access to a set of contextual information during its evaluation. This information pertains to the
job against which the policy is evaluated, along with a snapshot of the server statistics (for server-side policies) at the time
of the evaluation. This context is available with the method ExecutrionPolicy.getContext() and returns a PolicyContext
object, defined as follows:
public class PolicyContext {
// Get the job server side SLA, set at runtime by the server
public JobSLA getSLA()
// Get the job client side SLA, set at runtime by the server
public JobClientSLA getClientSLA()
// Get the job metadata, set at runtime by the server
public JobMetadata getMetadata()
// Get the number of nodes the job is already dispatched to
public int getJobDispatches()
// Get the server statistics
public JPPFStatistics getStats()
}
Note that, depending on where the execution policy is evaluated, some parts of the context may not be available:
– when the policy is evaluated in a client local executor, the server statistics are not available
– when the policy is evaluated on by server side scheduler, the client-side SLA is not available
– when the policy is evaluated via a call to any method of JPPFDriverAdminMBean which takes a NodeSelector or
ExecutionPolicy parameter, only the server statistics are available.
The context is available to any execution policy, however it will be especially useful for custom policies. For scripted
policies, the elements of information it provides are already split into separate variable bindings. Furtherrmore, keep in
mind that the policy context is only valid in the scope of the policy's accepts() method.
4.7.4 Custom policies
It is possible to apply user-defined policies. When you do so, a number of constraints must be respected:
• the custom policy class must extend CustomPolicy
• the custom policy class must be deployed in the JPPF server classpath as well as the client's
Here is a sample custom policy code:
package mypackage;
import org.jppf.utils.PropertiesCollection;
import org.jppf.node.policy.CustomPolicy;
// define a policy allowing only nodes with 2 processing threads or more
public class MyCustomPolicy extends CustomPolicy {
@Override public boolean accepts(PropertiesCollection info) {
// get the value of the "processing.threads" property
String s = this.getProperty(info, "processing.threads");
int n = -1;
try { n = Integer.valueOf(s); }
catch(NumberFormatException e) { // process the exception }
}
// node is accepted only if number of threads >= 2
return n >= 2;
}
Now, let's imagine that we want our policy to be more generic, and to accept nodes with at least a parametrized number
of threads given as argument to the policy.
Our policy becomes then:
public class MyCustomPolicy extends CustomPolicy {
public MyCustomPolicy(String...args) { super(args); }
}
@Override public boolean accepts(PropertiesCollection info) {
// get the value to compare with, passed as the first argument to this policy
String s1 = getArgs()[0];
int param = -1;
try { param = Integer.valueOf(s1); }
catch(NumberFormatException e) { }
String s2 = getProperty(info, "processing.thread");
int n = -1;
try { n = Integer.valueOf(s2); }
catch(NumberFormatException e) { }
return n >= param; // node is accepted only if number of threads >= param
}
Here we use the getArgs() method which returns an array of strings, corresponding to the arguments passed in the
XML representation of the policy.
To illustrate how to use a custom policy in an XML policy document, here is an example XML representation of the
custom policy we created above:
3
The "class" attribute is the fully qualified name of the custom policy class. There can be any number of elements,
these are the parameters that will then be accessible through CustomPolicy.getArgs().
When the XML descriptor is parsed, an execution policy object will be created exactly as in this code snippet:
MyCustomPolicy policy = new MyCustomPolicy();
policy.setArgs( "3" );
Finally, to enable the use of this custom policy, you will need to add the corresponding class(es) to both the server's and
the client's classpath, within either a jar file or a class folder.
4.7.5 Server global policies
JPPF 5.2 introduced a new type of execution policy which applies globally to all the nodes connected to a given server. It
allows expressing rules such as "execute a job only if there are more than 4 nodes, each with at least 2 processors and at
least 4 GB of memory".
This type of execution policy is represented by the class NodesMatching, defined as follows:
public class NodesMatching extends ExecutionPolicy {
// Initialize this execution policy
public NodesMatching(Operator operator, long expectedNodes, ExecutionPolicy nodePolicy)
// Evaluate this policy against the nodes
public boolean accepts(PropertiesCollection info)
}
The interesting part is in the constructor, which takes the following parameters:
• operator: one of the possible comparison operators defined in the Operator enum, that is, one of EQUAL,
NOT_EQUAL, LESS_THAN, MORE_THAN, AT_LEAST, AT_MOST.
• expectedNodes: this is the number of nodes expected to match the comparison with the actual number of nodes
that satisfy the nodePolicy parameter
• nodePolicy: an execution policy that will be evaluated against all the nodes. The number of nodes matching this
policy will be compared to the expectedNodes parameter using the operator parameter as a comparison operator.
Note: it is important ot remember that a NodesMatching policy is evaluated against the server, whereas the node policy
in its constructor is evaluated against each individual node.
As an example, we would write the policy expressed above as follows:
// 1 GB = 1,073,741,824 bytes
long GB = 1024*1024*1024;
// node with at least 2 processors and at least 4 GB of heap
ExecutionPolicy nodePolicy = new AtLeast("availableProcessors", 2)
.and(new AtLeast("maxMemory", 4*GB));
// more than 4 nodes satisfying the node policy
ExecutionPolicy globalPolicy = new NodesMatching(Operator.MORE_THAN, 4, nodePolicy);
Alternatively, it can also be written as an XML document:
availableProcessors
2
maxMemory
4294967296
4.7.6 Execution policy arguments as expressions
Since JPPF 6.0, the arguments of most execution policies can also be expressed as the values of JPPF properties
including property substitutions and scripted expressions.
As an example, let's imagine the configuration of a node contains the following properties:
int.1 = 1
int.2 = 2
int.3 = 3
Now let's say we define an execution policy where jobs only execute on nodes that have at least 4 processing threads:
ExecutionPolicy policy = new AtLeast("jppf.processing.threads", 4);
Further, let's sauy that the expected number of processing threads is not static, and actually depends on the node's
configuration. We could then rewrite the policy as in this example:
ExecutionPolicy policy = new AtLeast("jppf.processing.threads",
"$script{ ${int.1} + ${int.3} }$");
Given the node configuration above, we can see that the value of the expression will resolve to 4 for this node.
We distinguish two cases, depending on the semantic of the argument:
1) Property name argument: this argument is always the first in an execution policy's constructor. It can now be either:
• a literal which represents the name of a property. This preserve the behavior from before JPPF 6.0
• or an expression that resolves to a value of the type handled by the execution policy
The limitation here is that this argument can never be a literal value, since any literal is interpreted as the name of a
property. If a literal value is required, it may be specified instead in one of the right-side arguuments (if any), or as a
scripted expression, for instance: "$script{4}$".
In the example above, we used a property name expressed as the literal "jppf.processing.threads". We could also
write it as a completely unrelated expression, for instance:
ExecutionPolicy policy = new AtLeast("$script{ 2 * ${int.3} }$",
"$script{ ${int.1} + ${int.3} }$");
For the node we considered, this policy would resolve to the comparison "6 >= 4".
2) Right-side argument(s): as seen in the example above, the arguments on the right side can be expressions that must
resolve to the expected type of the argument, or to a literal of the expected type. For example, the polices below resolve
to the same comparison "valueOf("jppf.processing.threads") >= 4" when evaluated against our node:
ExecutionPolicy p1 = new AtLeast("jppf.processing.threads", "$script{ 2 * ${int.2} }$");
ExecutionPolicy p2 = new AtLeast("jppf.processing.threads", "4");
ExecutionPolicy p3 = new AtLeast("jppf.processing.threads", 4);
Important: all scripted expressions have access to a predefined variable named "jppfSystemInfo" which references
an object of type JPPFSystemInformation.
Note 1: support for arguments as expressions is clearly documented in each execution policy's javadoc
Note 2: remember that in an expression, property substitutions are evaluated first, and scripted expressions are
evaluated after that.
4.8 The JPPFClient API
A JPPF client is an object that will handle the communication between the application and the server. Its role is to:
• manage one or multiple connections with the server
• submit jobs and get their results
• handle notifications of job results
• manage each connection's life cycle events
• provide the low-level machinery on the client side for the distributed class loading mechanism
• provide an access point for the management and monitoring of each server
A JPPF client is represented by the class JPPFClient. We will detail its functionalities in the next sub-sections.
4.8.1 Creating and closing a JPPFClient
A JPPF client is a Java object, and is created via one of the constructors of the class JPPFClient. Each JPPF client has
a unique identifier that is always transported along with any job that is submitted by this client. This identifier is what
allows JPPF to know from where the classes used in the tasks should be loaded. In effect, each node in the grid will have
a map of each client identifier with a unique class loader, creating the class loader when needed. The implication is that, if
a new client identifier is specified, the classes used in any job / task submitted by this client will be dynamically reloaded.
This is what enables the immediate dynamic redeployment of code changes in the application. On the other hand, if a
previously existing identifier is reused, then no dynamic redeployment occurs, and code changes will be ignored (i.e. the
classes already loaded by the node will be reused), even if the application is restarted between 2 job submissions.
There are two forms of constructors for JPPFClient, each with a specific corresponding semantics:
Generic constructor with automatic identifier generation
public JPPFClient()
When using this constructor, JPPF will automatically create a universal unique identifier (uuid) that is guaranteed to be
unique on the grid. The first submission of a job will cause the classes it uses to be dynamically loaded by any node that
executes the job.
Constructor specifying a user-defined client identifier
public JPPFClient(String uuid)
In this case, the classes used by a job will be loaded only the first time they are used, including if the application has been
restarted in the meantime, or if the JPPF client is created from a separate application. This behavior is more adapted to
an application deployed in production, where the client identifier would only change when a new version of the application
is deployed on the grid. It is a good practice to include a version number in the identifier.
As a JPPFClient uses a number of system and network resources, it is recommended to use it as a singleton. It is
designed for concurrent use by multiple threads, which makes it safe for use with a singleton pattern. It is also
recommended to release these resources when they are no longer needed, via a call to the JPPFClient.close()
method. The following code sample illustrates what is considered a best practice for using a JPPFClient:
public class MyApplication {
// singleton instance of the JPPF client
private static JPPFClient jppfClient = new JPPFClient();
// allows access to the client from any other class
public static JPPFClient getJPPFClient() {
return jppfClient;
}
}
public static void main(String...args) {
// enclosed in a try / catch to ensure resources are properly released
try {
jppfClient = new JPPFClient();
// ... application-specific code here ...
} finally {
// close the client to release its resources
if (jppfClient != null) jppfClient.close();
}
}
4.8.2 Resetting the JPPF client
A JPPFClient can be reset at runtime, to allow the recycling of its server connections, along with dynamic reloading of its
configuration. Two methods are provided for this :
public class JPPFClient extends AbstractGenericClient {
// close this client, reload the configuration, then open it again
public void reset()
// close this client, then open it again using the specified confguration
public void reset(TypedProperties configuration)
}
Note that jobs that were already submitted by the client are not lost: they remain queued in the client and will be
resubmitted as soon as one or more server connections become available again.
4.8.3 Submitting a job
To submit a job, JPPFClient provides a single method:
public List> submitJob(JPPFJob job)
This method has two different behaviors, depending on whether the job is blocking or non-blocking:
• blocking job: the submitJob() method blocks until the job execution is complete. The return value is a list of tasks
with their results, in the same order as the tasks that were added to the job.
• non-blocking job: submitJob() returns immediately with a null value. It is up to the developer to collect the
execution results, generally via the JPPF job API.
4.8.4 Cancelling a job
The ability to cancel a job is provided by JPPFClient's superclass AbstractGenericClient, which provides a
cancelJob() method, defined as follows:
// superclass of JPPFClient
public abstract class AbstractGenericClient extends AbstractJPPFClient
// cancel the job with the specified UUID
public boolean cancelJob(final String jobUuid) throws Exception;
}
{
This will work even if the client is connected to multiple drivers. In this case, it will send the cancel request to all drivers.
4.8.5 Switching local execution on or off
The JPPFClient API allows users to dynamically turn the local (in the client JVM) execution of jobs on or off, and
determine whether it is active or not. This is done via these two methods:
// Determine whether local execution is enabled on this client
public boolean isLocalExecutionEnabled()
// Specify whether local execution is enabled on this client
public void setLocalExecutionEnabled(boolean localExecutionEnabled)
Turning local execution on or off will affect the next job to be executed, but not any that is currently executing.
4.8.6 Registering additional class loaders to handle requests from the driver
In some unusual use cases, it may be necessary to register addtional class loaders with the cllient for a given job. This
can happen if some of the tasks in the job are loaded with different class loaders, or in some situations when the job is
submitted from a task executing in a a node. To enable this, the super class of JPPFClient, AbstractGenericClient,
provides the following method:
public class AbstractGenericClient extends AbstractJPPFClient {
// register a class loader for the specified job uuid
public ClassLoader registerClassLoader(ClassLoader cl, String uuid)
}
The class loader must be registered before the job is submitted, otherwise it will have no effect. Additionally, the
unregistration of the class loader is automatically performed by the JPPF client, once the job has completed.
Example usage:
JPPFClient client = new JPPFClient();
JPPFJob job = newJPPFJob();
// let's assume MyTask is loaded from a separate class loader
MyTask myTask = ...;
job.add(task);
ClassLoader cl = myTask.getClass().getClassLoader();
// register the class loader for the job, before submitting it
client.registerClassLoader(cl, job.getUuid());
List> result = client.submitJob(job);
4.8.7 Changing and retrieving the load-balancer settings
The load-balancer configuration can be dynamically updated and retrieved, even while the client is executing jobs, using
the following methods in JPPFClient:
public class JPPFClient extends AbstractGenericClient {
// Get the current load-balancer settings
public LoadBalancingInformation getLoadBalancerSettings()
}
// Change the load balancer settings
public void setLoadBalancerSettings(String algorithm, Properties parameters)
throws Exception
The parameters of the setLoadBalancerSettings() method are those of the load-balancing algorithm and must
not be prefixed. The getLoadBalancerSettings() method provides the current configuration encapsulated in a
LoadBalancingInformation object.
Example usage:
JPPFClient jppfCLient = new JPPFClient();
// configure the "proportional" load-balancer algorithm
TypedProperties props = new TypedProperties()
.setInt("initialSize", 5)
.setInt("proportionalityFactor", 1);
jppfClient.setLoadBalancerSettings("proportional", props);
// retrieve the load-balancer configuration
LoadBalancingInformation lbi = jppfClient.getLoadBalancerSettings();
System.out.println("current algorithm is " + lbi.getAlgorithm());
System.out.println("algorithm parameters: " + lbi.getParameters());
4.9 Connection pools
All server connections in a JPPF client are organized into connection pools, whose number is determined by the client
configuration properties, and whose size is based on the configuration as well and can also be changed dynamically via
the JPPF APIs. In the next sections, we will see how connection pools can be configured, explored and programmatically
accessed.
4.9.1 The JPPFConnectionPool class
A connection pool is represented by the JPPFConnectionPool class, defined as follows:
public class JPPFConnectionPool extends AbstractConnectionPool
implements Comparable {
// Get the name of this pool
public String getName()
// Get the id of this pool, unique within a JPPFClient instance
public int getId()
// Get the priority associated with this pool
public int getPriority()
// Check whether this pool is for SSL connections
public boolean isSslEnabled()
// Get the uuid of the driver to which connections in this pool are connected
public String getDriverUuid()
// Get the host name or IP address of the remote driver
public String getDriverHost()
// Get the port number to which the client connects on the remote driver
public int getDriverPort()
// Set the maximum size of this pool, starting or stopping connections as needed
public int setSize(int size)
// Get a list of connections in this pool whose status is one of those specified
public List getConnections(JPPFClientConnectionStatus...statuses)
// Get the number of connections in this pool whose status is one of those specified
public int connectionCount(JPPFClientConnectionStatus...statuses)
}
As we can see, JPPFConnectionPool extends the class AbstractConnectionPool, which in turn implements the interface
ConnectionPool, defined as follows:
public interface ConnectionPool
extends Iterable, AutoCloseable {
// Get the next connection that is connected and available
E getConnection();
// Determine whether this pool is empty
boolean isEmpty();
// Get the current size of this pool
int connectionCount();
// Get the maximum size of this connection pool
int getSize();
// Set the maximum size of this pool, starting or stopping connections as needed
int setSize(int size);
// Get a list of connections held by this pool
List getConnections();
}
The pool name is based on the client configuration properties and defined as follows:
With server discovery disabled:
jppf.discovery.enabled = false
jppf.drivers = ...
Each driver name specified in “jppf.drivers” will be the name of the corresponding connection pool.
With discovery enabled: for each discovered server, the JPPF client will create a connection pool named
“jppf_discovery-n” where n is a sequence number automatically assigned by the client.
The id attribute of a pool is a sequence number that is guaranteed to be unique within a JPPF client. It is used to
distinguish pools that may have the same driver uuid, priority and size. The name may be used similarly, however JPPF
does not do any checking on pool names, so they should be used with caution.
The pool's priority is as defined in the configuration. For instance if we have the following:
jppf.drivers = myPool
myPool.jppf.server.host = www.myhost.com
myPool.jppf.server.port = 11111
myPool.jppf.priority = 10
# pool size
myPool.jppf.pool.size = 5
The corresponding JPPFConnectionPool object's getPriority() method will return 10.
In the same way, the getCoreSize() method will return 5.
The pool's actual size can be grown or shrunk dynamically, using the setSize(int) method. The JPPF client will create
or close connections accordingly. An attempt to set a max size equal to the current max size will have no effect
whatsoever.
In some cases, when trying to reduce the connection pool's size, there may be too many connections in the pool busy
executing jobs and the client will not be close all the requested connections. In this case, setSize() will return the new
actual size, which will be smaller than the requested size.
The pool size can also be defined in the configuration, either with the .jppf.pool.size property for
manually configured pools, or with jppf.pool.size for auto-discovered pools.
The two getConnections() methods allow you to explore the connections currently in the pool. The overloaded
version of this method permits filtering of the connections by their status, represented by one or more
JPPFClientConnectionStatus enum values.
4.9.2 Client connections
4.9.2.1 The JPPFClientConnection interface
As we have seen, connection pools contain and manage a set of connections from a client to a driver. These connections
are represented by the JPPFClientConnection interface, defined as follows:
public interface JPPFClientConnection
extends ClientConnectionStatusHandler, AutoCloseable {
// Get the priority assigned to this connection
int getPriority();
// Shutdown this connection and release all the resources it is using
void close();
// Determine whether this connection was closed
boolean isClosed();
// Get the name assigned to this client connection
String getName();
// Determines if this connection is over SSL
boolean isSSLEnabled();
// Get the driver's host name or ip address
String getHost();
// Get the port number on which the dirver is listeneing for connections
int getPort();
// Get the unique identifier of the remote driver
String getDriverUuid();
// Get the system information for the remote driver this connection refers to
JPPFSystemInformation getSystemInfo();
// Get the unique ID for this connection and its two channels
String getConnectionUuid();
// Get the pool this connection belongs to
JPPFConnectionPool getConnectionPool();
}
Note that most of these methods, except for close(), isClosed(), getConnectionUuid() and getName(),
actually delegate to the JPPFConnectionPool to which the connection belongs.
4.9.2.2 Status notifications for existing connections
Each server connection has a status that depends on the state of its network connection to the server and whether it is
executing a job request. A connection status is represented by the enum JPPFClientConnectionStatus, and has the
following possible values: NEW, DISCONNECTED, CONNECTING, ACTIVE, EXECUTING, FAILED or CLOSED.
JPPFClientConnection extends the interface ClientConnectionStatusHandler, which provides the following methods
to handle the connection status and register or remove listeners:
public interface ClientConnectionStatusHandler {
// Get the status of this connection
JPPFClientConnectionStatus getStatus();
// Set the status of this connection
void setStatus(JPPFClientConnectionStatus status);
// Register a connection status listener with this connection
void addClientConnectionStatusListener(ClientConnectionStatusListener listener);
// Remove a connection status listener from the registered listeners
void removeClientConnectionStatusListener(ClientConnectionStatusListener listener);
}
Here is a sample status listener implementation:
public class MyStatusListener extends ClientConnectionStatusListener {
@Override
public void statusChanged(ClientConnectionStatusEvent event) {
// obtain the client connection from the event
JPPFClientConnection connection =
(JPPFClientConnection) event.getClientConnectionStatusHandler();
// get the new status
JPPFClientConnectionStatus status = connection.getStatus();
System.out.println("Connection " + connection.getName() + " status changed to "
+ status);
}
}
4.9.3 Exploring the connections in a pool
JPPFConnectionPool also provides a set of methods to wait for if needed, and obtain, a set of connections that satisfy a
particular condition:
public class JPPFConnectionPool extends AbstractConnectionPool
implements Comparable {
}
// Wait for the specified number of connections to be in the ACTIVE status
public List awaitActiveConnections(
Operator operator, int nbConnections)
// Wait for the specified number of connections to be in ACTIVE or EXECUTING} status
public List awaitWorkingConnections(
Operator operator, int nbConnections)
// Wait for the specified number of connections to be in one of the specified statuses
public List awaitConnections(
Operator operator, int nbConnections, JPPFClientConnectionStatus...statuses)
// Wait for the specified number of connections to be in one of the specified statuses
// or the specified timeout to expire, whichever happens first
public List awaitConnections(Operator operator,
int nbConnections, long timeout, JPPFClientConnectionStatus...statuses)
Examples:
JPPFConnectionPool pool = ...;
// wait until more than 2 connections in the pool are active
List list = pool.awaitActiveConnections(Operator.GREATER, 2);
// wait for no more than 5 seconds or until at least one connection is closed
List list2 = pool.awaitConnections(
Operator.AT_LEAST, 1, 5000L, JPPFClientConnectionStatus.CLOSED);
The Operator parameter defines the possible conditions that the number of connections must satisfy:
public enum Operator {
// The number of connections
EQUAL,
// The number of connections
NOT_EQUAL,
// The number of connections
AT_LEAST,
// The number of connections
AT_MOST,
// The number of connections
GREATER,
// The number of connections
LESS
}
is equal to the expected number
is different from the expected number
is at least to the expected number
is at most the expected number
is strictly greater than the expected number
is strictly less than the expected number
4.9.4 Associated JMX connection pool
Each connection pool has an associated pool of JMX connections to the same remote driver.. To access and manipulate
this JMX pool, the JPPFConnectionPool class provides the following API:
public class JPPFConnectionPool extends AbstractConnectionPool
implements Comparable {
// Get a connected JMX connection among those in the JMX pool
public JMXDriverConnectionWrapper getJmxConnection()
// Get a JMX connection in the specified state from the JMX pool
public JMXDriverConnectionWrapper getJmxConnection(boolean connectedOnly)
// Get the jmx port to use on the remote driver
public int getJmxPort()
// Get a connected JMX connection among those in the JMX pool
public JMXDriverConnectionWrapper getJmxConnection()
// Get a JMX connection with the specified state among those in the JMX pool
public JMXDriverConnectionWrapper getJmxConnection(final boolean connectedOnly)
// Get the core size of the associated JMX connection pool
public int getJMXPoolCoreSize()
// Get the current maximum size of the associated JMX connection pool
public int getJMXPoolMaxSize()
// Set a new maximum size for the associated pool of JMX connections,
// adding new or closing existing connections as needed
public int setJMXPoolMaxSize(int maxSize)
// Get the list of connections currently in the JMX pool
public List getJMXConnections()
// Wait for the specified number of JMX connections to be in the specified state
public List awaitJMXConnections(
Operator operator, int nbConnections, final boolean connectedOnly)
// Wait for the specified number of JMX connections to be in the specified state,
// or the specified timeout to expire, whichever happens first
public List awaitJMXConnections(
Operator operator, int nbConnections, long timeout, boolean connectedOnly)
}
Note that the JMX pool core size, when left unspecified, defaults to 1. Otherwise, it is defined in the configuration as:
When discovery is enabled:
jppf.jmx.pool.size = 5
When discovery is disabled:
driver-1.jppf.jmx.pool.size = 5
Also note that the driver host for a JMX connection is the same as JPPFConnectionPool.getDriverHost(). In the
same way, to determine whether a JMX connection is secure, JPPFConnectionPool.isSSLEnabled() should be
used.
4.9.5 Exploring the connection pools
The JPPFClient class, or more exactly its super-super class AbstractJPPFClient, provides a number of methods to
discover and explore the connection pools currently handled by the client:
public class JPPFClient extends AbstractGenericClient { ... }
public abstract class AbstractGenericClient extends AbstractJPPFClient { ... }
public abstract class AbstractJPPFClient
implements ClientConnectionStatusListener, AutoCloseable {
// Find the connection pool with the specified priority and id
public JPPFConnectionPool findConnectionPool(int priority, int poolId)
// Find the connection pool with the specified id
public JPPFConnectionPool findConnectionPool(int poolId)
// Find the connection pool with the specified name
public JPPFConnectionPool findConnectionPool(String name)
// Find the connection pools whose name matches the specified regular expression
public List findConnectionPools(String name)
// Find the connection pools that have at least one connection matching
// one of the specified statuses
public List findConnectionPools(
JPPFClientConnectionStatus...statuses)
// Get a set of existing connection pools with the specified priority
public List getConnectionPools(int priority)
// Get a list of all priorities for the currently existing pools in descending order
public List getPoolPriorities()
// Get a list of existing connection pools, ordered by descending priority
public List getConnectionPools()
// Get a pool with the highest possible priority that has at least 1 active connection
public JPPFConnectionPool getConnectionPool()
// Get the connection pools that pass the specified filter
public List findConnectionPools(
ConnectionPoolFilter filter)
}
Note that the connection pools are held in a multimap-like data structure, where the key is the pool priority sorted in
descending order (highest priority first). Consequently, all getXXX() and findXXX() methods which return a list of
connection pools are guaranteed to have the resulting elements of the list sorted by descending priority.
The last findConnectionPools() method provides a generic way of filtering the existing connection pools, by making
use of a ConnectionPoolFilter, defined as follows:
public interface ConnectionPoolFilter {
// Determine whether this filter accepts the specified connection pool
boolean accepts(E pool);
}
In addition to this, JPPFClient provides a set of methods which wait until one or more conenction pools fulfill a specified
condition, and return a list of pools which satisfy the condition:
public class JPPFClient extends AbstractGenericClient
// Wait for at least one connection pool with at least one connection in ACTIVE status
public JPPFConnectionPool awaitActiveConnectionPool()
// Wait for at least one connection pool with at least one connection
// in ACTIVE or EXECUTING status
public JPPFConnectionPool awaitWorkingConnectionPool()
// Wait for at least one connection pool with at least one connection
// in one of the specified statuses
public JPPFConnectionPool awaitConnectionPool(JPPFClientConnectionStatus...statuses)
// Wait for at least one connection pool with at least one connection in one of the
// specified statuses or the specified timeout (in ms) expires, whichever happens first
public JPPFConnectionPool awaitConnectionPool(
long timeout, JPPFClientConnectionStatus...statuses)
// Wait for at least one connection pool with at least one connection in one of the
// specified statuses or the specified timeout (in ms) expires, whichever happens first
public List awaitConnectionPools(
long timeout, JPPFClientConnectionStatus...statuses)
}
4.9.6 Connection Pool Events
The JPPF client API allows the registration or unregistration of listeners to connection pool events: connection pools
added to or removed from the client, or connections added to or removed from a connection pool. This can be done in two
ways:
1) From a JPPFClient constructor:
public class JPPFClient extends AbstractGenericClient {
// Initialize this client with an automatically generated application UUID
public JPPFClient(final ConnectionPoolListener... listeners)
}
// Initialize this client with the specified UUID and listeners
public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) {
2) Using the related add and remove methods in the grand parent of JPPFClient:, AbstractJPPFClient:
public abstract class AbstractJPPFClient
implements ClientConnectionStatusListener, AutoCloseable {
// Add a listener to the list of listeners to this client
public void addConnectionPoolListener(final ConnectionPoolListener listener)
// Remove a listener from the list of listeners to this client
public void removeConnectionPoolListener(final ConnectionPoolListener listener)
}
As we can see in the methods signatures, the listeners implement the interface ConnectionPoolListener, is defined as:
public interface ConnectionPoolListener extends EventListener {
// Called when a new connection pool is created
void connectionPoolAdded(ConnectionPoolEvent event);
// Called when a connection pool removed
void connectionPoolRemoved(ConnectionPoolEvent event);
// Called when a new connection is created
void connectionAdded(ConnectionPoolEvent event);
// Called when a connection pool is removed
void connectionRemoved(ConnectionPoolEvent event);
}
Note that, if you do not wish to implement all the methods in ConnectionPoolListener, you can instead extend the adapter
class ConnectionPoolListenerAdapter, which implments each method of thei nterface as an empty method.
All notification methods receive an event of type ConnectionPoolEvent, defined as follows:
public class ConnectionPoolEvent extends EventObject {
// Get the source of this event
public JPPFConnectionPool getConnectionPool()
// Get the connection that triggered this event, if any
public JPPFClientConnection getConnection()
}
Please note that, in the case of a connectionPoolAdded() or connectionPoolAdded() notification, the method
getConnection() will return nulll, since no connection is involved.
4.9.7 Putting it all together
We will illustrate how client, connection pool events and connection events fit together, with an example which prints the
status of all connections created by the client:
// this status listener prints the old and new status of each connection
final ClientConnectionStatusListener myStatusListener =
new ClientConnectionStatusListener {
@Override
public void statusChanged(ClientConnectionStatusEvent event) {
// obtain the client connection from the event
JPPFClientConnection connection =
(JPPFClientConnection) event.getClientConnectionStatusHandler();
// get the new and old status
JPPFClientConnectionStatus newStatus = connection.getStatus();
JPPFClientConnectionStatus oldStatus = event.getOldStatus();
// print the connection name, old status and new status
System.out.println("Connection " + connection.getName() + " status changed from "
+ oldStatus + " to " + newStatus);
}
}
// create a connection pool listener which registers
// a status listener on new connections
ConnectionPoolListener myPoolListener = new ConnectionPoolListenerAdapter() {
@Override
public void connectionAdded(final ConnectionPoolEvent event) {
// obtain the connection pool and client connection from the event
JPPFConnectionPool pool = event.getConnectionPool();
JPPFClientConnection connection = event.getConnection();
System.out.println("connection " + connection + " added to pool " + pool);
// add the status listener to the connection
connection.addClientConnectionStatusListener(myListener);
}
@Override
public void connectionRemoved(final ConnectionPoolEvent event) {
// obtain the connection pool and client connection from the event
JPPFConnectionPool pool = event.getConnectionPool();
JPPFClientConnection connection = event.getConnection();
System.out.println("connection " + connection + " removed from pool " + pool);
// remove the status listener from the connection
connection.removeClientConnectionStatusListener(myListener);
}
}
// create a JPPFClient with our connection pool listener
JPPFClient client = new JPPFClient(myPoolListener);
4.10 Notifications of client job queue events
The JPPF client alllows receiving notifications of when jobs are added to or removed from its queue. To this effect, the
AbstractGenericClient class (the super class of JPPFClient) provides methods to register or unregister listeners for these
notifications:
public abstract class AbstractGenericClient extends AbstractJPPFClient {
// Register the specified listener to receive client queue event notifications
public void addClientQueueListener(ClientQueueListener listener)
}
// Unregister the specified listener
public void removeClientQueueListener(ClientQueueListener listener)
As we can see, these methods accept listners of type ClientQueueListener, defined as follows:
public interface ClientQueueListener extends EventListener {
// Called to notify that a job was added to the queue
void jobAdded(ClientQueueEvent event);
// Called to notify that a job was removed from the queue
void jobRemoved(ClientQueueEvent event);
}
The jobAdded() and jobRemoved() methods are notifications of events of type ClientQueueEvent:
public class ClientQueueEvent extends EventObject {
// Get the JPPF client source of this event
public JPPFClient getClient()
// Get the job that was added or removed
public JPPFJob getJob()
// Get all the jobs currently in the queue
public List getQueuedJobs()
}
// Get the size of this job queue
public int getQueueSize()
Here is an example usage, which adapts the size of a client connection pool based on the number of jobs in the queue:
JPPFClient client = new JPPFClient();
JPPFConnectionPool pool;
// wait until "myPool" is initialized
while ((pool = client.findConnectionPool("myPool")) == null) Thread.sleep(20L);
final JPPFConnectionPool thePool = pool;
// register a queue listener that will adapt the pool size
client.addClientQueueListener(new ClientQueueListener() {
@Override public void jobAdded(ClientQueueEvent event) {
int n = event.getQueueSize();
// grow the connection pool
JPPFConnectionPool pool = event.getClient().findConnectionPool("myPool");
if (n > pool.getMaxSize()) pool.setMaxSize(n);
}
@Override public void jobRemoved(ClientQueueEvent event) {
int n = event.getQueueSize();
// shrink the connection pool
JPPFConnectionPool pool = event.getClient().findConnectionPool("myPool");
if (n < pool.getMaxSize()) pool.setMaxSize(n);
}
});
// ... submit jobs ...
4.11 Submitting multiple jobs concurrently
In this section, we will present a number of ways to design an application, such that it can execute multiple jobs
concurrently, using a single JPPFClient instance. These can be seen as common reusable patterns, in an attempt at
covering the most frequent use cases where submission and processing of multiple jobs in parallel is needed.
The patterns presented here all make the assumption that job submissions are performed through a single instance of
JPPFClient. It is indeed the recommended way to work with JPPF, since it benefits the most from the built-in features of
the JPPF client:
– thread safety
– ability to connect to multiple remote drivers, to the same driver multiple times, or any combination of these
– load-balancing between available connections
– ability to submit a job over multiple connections for increased performance
– fine-grained filtering of eligible connections for each job, via the job's client-side execution policy
– connection failover strategies defined via the connection pools priorities
4.11.1 Base requirement: multiple connections
For a JPPF client to be able to process multiple jobs in parallel, it is mandatory that the client holds multiple connections,
whether to a single server, multiple servers or any combination of these. The number of connections determines how
many jobs can be sent concurently to a server. If only one connection is available, then only one job at a time will actually
be processed by the JPPF grid.
When there are more jobs than available connections, the remainder of the jobs will be queued in the client, waiting for a
connection to become available for submision of a new job.
4.11.1.1 Defining connections in the client configuration
In the configuration of a JPPF client, a connection pool is defined as follows:
# declaration of the named drivers to connect to
jppf.drivers = driver1
# definition of the "driver1" connection pool
driver1.jppf.server.host = 192.168.1.52
driver1.jppf.server.port = 11111
# define a connection pool size of 4
driver1.jppf.pool.size = 4
When the JPPFClient is created, it will open 4 connections to the driver, allowing us to submit up to 4 jobs concurrently.
4.11.1.2 Setting a connection pool size programmatically
For each of the named drivers declared in the configuration, the JPPF client will create a connection pool, with the
specified size or number of connections. The JPPFClient class and its ancestor classes, AbstractGenericClient and
AbstractJPPFClient, provide a plethora of ways to find or lookup connections pools, or even wait for them to be in a
desired state. This is done with one or more of the getConnectionPools(...), findConnectionPools(...),
awaitXXX(...) methods in these classes. This can be used to retrieve a connection pool from a JPPFClient instance
and change its size dynamically, as in this example:
try (JPPFClient client = new JPPFClient()) {
// wait for a connection pool that has at least one connection
// in the active state (i.e. available for job submission)
JPPFConnectionPool pool = client.awaitActiveConnectionPool();
// change the pool size to 10
pool.setSize(10);
}
4.11.2 Job submissions from multiple threads
This pattern explores how concurrent jobs can be submitted by the same JPPFClient instance by multiple threads. In this
pattern, we are using blocking jobs, since each job is submitted in its own thread, thus we can afford blocking that thread
until the job completes:
public void multipleThreadsBlockingJobs() {
// a pool of threads that will submit the jobs and retrieve their results
ExecutorService executor = Executors.newFixedThreadPool(4);
try (JPPFClient client = new JPPFClient()) {
// handles for later retrieval of the job submissions results
List>>> futures = new ArrayList<>();
for (int i=0; i<4; i++) {
JPPFJob job = new JPPFJob();
// ... set attributes and add tasks ...
// submit the job in a separate thread
futures.add(executor.submit(new JobSubmitter(client, job)));
}
for (Future>> future: futures) {
try {
// wait until each job has completed and retrieve its results
List> results = future.get();
// ... process the job results ...
processResults(results);
} catch (Exception e) {
e.printStackTrace();
}
}
}
executor.shutdown();
}
The class JobSubmitter is defined as follows:
public class JobSubmitter implements Callable>> {
private final JPPFClient client;
private final JPPFJob job;
public JobSubmitter(JPPFClient client, JPPFJob job) {
this.client = client;
this.job = job;
}
}
@Override public List> call() throws Exception {
// just submit the job
return client.submitJob(job);
}
4.11.3 Multiple non-blocking jobs from a single thread
Here, we take advantage of the asynchronous nature of non-blocking jobs to write a much less cumbersome version of
the previous pattern:
public void singleThreadNonBlockingJobs() {
try (final JPPFClient client = new JPPFClient()) {
// holds the submitted jobs for later retrieval of their results
List jobs = new ArrayList<>();
// submit the jobs without blocking the current thread
for (int i=0; i<4; i++) {
JPPFJob job = new JPPFJob();
job.setBlocking(false);
// ... set other attributes and add tasks ...
jobs.add(job);
client.submitJob(job); // non-blocking operation
}
// get and process the jobs results
for (JPPFJob job: jobs) {
// synchronize on each job's completion: this is a blocking operation
List> results = job.awaitResults();
processResults(results); // process the job results
}
} catch(Exception e) {
e.printStackTrace();
}
}
4.11.4 Fully asynchronous processing
Here, we use a JobListener to retrieve and process the results of the jobs via jobs life cycle notifications. The only
synchronization occurs in the main method, to await on the global completion of all jobs:
public void asynchronousNonBlockingJobs() {
try (final JPPFClient client = new JPPFClient()) {
int nbJobs = 4;
// synchronization helper that tells us when all jobs have completed
final CountDownLatch countDown = new CountDownLatch(nbJobs);
for (int i=0; i> results = event.getJob().getAllResults();
processResults(results); // process the job results
// decrease the jobs count down
// when the count reaches 0, countDown.await() will exit immediately
countDown.countDown();
}
});
// ... set other attributes, add tasks, submit the job ...
client.submitJob(job);
}
// wait until all jobs are complete, i.e. until the count down reaches 0
countDown.await();
} catch(Exception e) {
e.printStackTrace();
}
}
4.11.5 Job streaming
Job streaming occurs when an application is continuously creating and executing jobs, based on a potentially infinite
source of data. The main problem to overcome in this use case is when jobs are created much faster than they are
executed, thus potentially filling the memory until an OutOfMemoryError occurs. A possible solution to this is to build a job
provider with a limiting factor, which determines the maximum number of jobs that can be running at any given time.
Additionally, an Iterator is a Java data structure that fits particulary well the streaming pattern, thus our job provider will
implement the Iterable interface:
public class JobProvider extends
implements Iterable,
private int concurrencyLimit;
private int currentNbJobs = 0;
JobListenerAdapter
Iterator {
// limit to the maximum number of concurrent jobs
// current count of concurrent jobs
public JobProvider(int concurrencyLimit) {
this.concurrencyLimit = concurrencyLimit;
}
// implementation of Iterator
@Override public synchronized boolean hasNext() {
boolean hasMoreJobs = false;
// ... compute hasMoreJobs, e.g. check if there is any more data to read
return hasMoreJobs;
}
@Override public synchronized JPPFJob next() {
// wait until the number of running jobs is less than the concurrency limit
while (currentNbJobs >= concurrencyLimit) {
try {
wait();
} catch (Exception e) {
e.printStackTrace();
}
}
return buildJob();
}
@Override public void remove() {
throw new UnsupportedOperationException("remove() is not supported");
}
private synchronized JPPFJob buildJob() {
JPPFJob job = new JPPFJob();
// ... build the tasks by reading data from a file, a database, etc...
// ... add the tasks to the job ...
job.setBlocking(false);
// add a listener to update the concurrent jobs count when the job ends
job.addJobListener(this);
// increase the count of concurrently running jobs
currentNbJobs++;
return job;
}
// implementation of JobListener
@Override synchronized public void jobEnded(JobEvent event) {
processResults(event.getJob().getAllResults()); // process the job results
// decrease the count of concurrently running jobs
currentNbJobs--;
// wake up the threads waiting in next()
notifyAll();
}
// implementation of Iterable
@Override public Iterator iterator() { return this; }
protected void processResults(List> results) { // ... }
}
Note the use of a JobListener to ensure the current count of jobs is properly updated, so that the provider can create
new jobs from its data source. It is also used to process the job results asynchronously.
Now that we have a job provider, we can use it to submit the jobs it creates to a JPPF grid:
public void jobStreaming() {
try (JPPFClient client = new JPPFClient()) {
// create the job provider with a limiting concurrency factor
JobProvider jobProvider = new JobProvider(4);
// build and submit the provided jobs until no more is available
for (JPPFJob job: jobProvider) {
client.submitJob(job);
}
} catch(Exception e) {
e.printStackTrace();
}
}
4.11.5.1 The AbstractJPPFJobStream helper class
Given the potential complexity of the job streaming pattern, we found it useful to provide a helper class which alleviates
the work of a developer by implementing all the wiring and internal state transitions, such that the developers can solely
focus on the specifics of the jobs they want to submit. The abstract class AbstractJPPFJobStream serves this purpose. It
is defined as follows:
public abstract class AbstractJPPFJobStream extends JobListenerAdapter
implements Iterable, Iterator, AutoCloseable {
// Initialize this job provider with a concurrency limit
public AbstractJPPFJobStream(final int concurrencyLimit) {
// Determine whether there is at least one more job in the stream
// This method must be overriden in subclasses
public abstract boolean hasNext()
// Get the next job in the stream
public synchronized JPPFJob next() throws NoSuchElementException
// Create the next job in the stream, along with its tasks
// This method must be overriden in subclasses and is called from next()
protected abstract JPPFJob createNextJob()
// This operation is not supported
public void remove() throws UnsupportedOperationException
// Update the state of this job stream and process the results of a job asynchronously
public void jobEnded(final JobEvent event)
// Callback invoked from jobEnded() when a job is complete
// This method must be overriden in subclasses
protected abstract void processResults(JPPFJob job)
// implementation of Iterable
public Iterator iterator()
// Close this stream and release the underlying resources it uses
// This method must be overriden in subclasses
public abstract void close() throws Exception
}
// Determine whether any job is still being executed
public synchronized boolean hasPendingJob()
// Get the number of executed jobs
public synchronized int getJobCount()
// Get the number of executed tasks
public synchronized int getTaskCount()
This class is designed to be subclassed and to this effect, we have outlined the four abstract methods that must be
overriden in any subclass. We can see how this will simplify the work of any implementation. Let's re-implement the
previous example by subclassing AbstractJPPFJobStream:
public class JobProvider extends AbstractJPPFJobStream {
public JobProvider(int concurrencyLimit) {
super(concurrencyLimit);
}
@Override public synchronized boolean hasNext() {
boolean hasMoreJobs = false;
// ... compute hasMoreJobs, e.g. check if there is any more data to read
return hasMoreJobs;
}
@Override protected JPPFJob createNextJob() {
JPPFJob job = new JPPFJob();
// ... build the tasks by reading data from a file, a database, etc...
// ... add the tasks to the job and return it ...
return job;
}
@Override protected void processResults(List> results) { // ... }
@Override public void close() throws Exception {
// close a file, database connection, etc...
}
}
4.11.6 Dedicated sample
For a fully working and documented example of the patterns seen in the previous sections, you are invited to explore the
dedicated Concurrent Jobs demo.
4.12 Jobs persistence in the driver
As of JPPF 6.0, drivers can persist the jobs they receive, along with their results, in a permanent store. This adds major
new capabilities to the drivers, in particular:
– automatic recovery of the jobs in case of driver crashes and resubmission to completion after driver restart, without any
external intervention
– the ability to submit jobs from a JPPF client, then check their completion and retrieve their results from a separate client
– the ability to retrieve jobs on demand from a persistent store and resubmit them to completion
To achieve this, JPPF provides the following components:
– a pluggable persistence facility in the driver, with ready to use, built-in implementations
– extensions to the the job SLA specifying if and how the persistence of each job should be handled
– a client-side facility to manage, retrieve and monitor persisted jobs
These items are detailed in the next sections.
The persistence facility relies on an implementation of the JobPersistence interface, whose role is to store, load, delete
and query the job elements that are persisted. A job element can be one of 4 types:
– a job header element, which includes the job's uuid, name, SLA, and metadata, but also the information required for job
routing and scheduling
– a data provider element, if any is present
– an element for each task, before its execution
– an element for each executed task returned by the nodes, also known as task execution result
Note: the types of job elements are represented by the PersistenceObjectType enum.
Each job element is stored as binary data which represents a serialized object graph. Where and how it is stored depends
solely on the JobPersistence implementation. It can be on a file system, a relational database, cloud storage facility, etc.
4.12.1 Job persistence specification
The persistence of a job is specified via a PersistenceSpec object, defined as follows:
public class PersistenceSpec implements Serializable {
// Determine whether the job is persisted in the driver. Defaults to false
public boolean isPersistent()
// Specify whether the job is persisted in the driver
public PersistenceSpec setPersistent(boolean persistent)
// Determine whether the driver should automatically execute the persisted job
// after a restart. Defaults to false
public boolean isAutoExecuteOnRestart()
// Specify whether the driver should automatically execute the job after a restart
public PersistenceSpec setAutoExecuteOnRestart(boolean autoExecuteOnRestart)
// Determine whether the persisted job should be deleted from the store upon
// completion. Defaults to true
public boolean isDeleteOnCompletion()
// Determine whether the job should be deleted from the store upon completion
public PersistenceSpec setDeleteOnCompletion(boolean deleteOnCompletion)
}
As we can see, instances of this class manage three boolean flags which specify whether the driver will persist the job
and what it will do with a persisted job when it completes or when a driver restart occurs.
The "persistent" flag determines whether the job is persisted at all. By default, it is set to false, which means that a job is
not persisted by default. When set to true, the driver will persist the job elements it receives from the client, and later on
the execution results received from the nodes. When a job is not configured as persistent, it will be processed without any
overhead. If this flag is set to false, none of the other flags has any effect.
The "delete on completion" flag determines whether the job should be removed from the store when it completes. This
addresses situations where a client remains connected to the driver and awaits the job results for further processing,
while you still want the driver to be able to recover the job in case of a crash followed by a restart. Since the client
receives the results, they no longer need to be kept in the permanent store. This flag is set to true by default.
The "auto execute on restart" flag tells a driver that, upon restart, it should automatically resubmit the job's unexecuted
tasks until the job completes. At startup, the driver will retrieve all the jobs with this flag set to true that have not yet
completed, and resubmit them. This flag is set to false by default.
Note: when the "auto execute on restart" flag is true, the nodes to which unexecuted tasks are dispatched still need
access to all the required classes for these tasks. The easiest way to achieve this is to add the corresponding jar files
and class folders to the driver's classpath and let the nodes' distributed class loader find them there.
As an example, here is how we would configure a persistent job that should be automatically executed upon driver restart
and deleted from the store upon completion:
JPPFJob job = new JPPFJob();
job.getSLA().getPersistenceSpec()
.setPersistent(true)
.setAutoExecuteOnRestart(true)
.setDeleteOnCompletion(true);
Note: if your intended usage scenario is to submit a job, close the client application, then query the job results later on,
you MUST set the job's cancelUponClientDisconnect flag to false. This will prevent the job from being cancelled when
disconnecting the client.
4.12.2 Managing persisted jobs
In order to manage and query the jobs in a persistence store, JPPF provides a client-side facility, based on JMX
management APIs, which connects to a remote driver and accesses its persistence store. This facility is implemented in
the JPPFDriverJobPersistence class, defined as follows:
public class JPPFDriverJobPersistence {
// Initialize this persisted job manager with the specified driver JMX connection
public JPPFDriverJobPersistence(JMXDriverConnectionWrapper jmx)
// List the persisted jobs that match the provided job selector
public List listJobs(JobSelector selector) throws Exception
// Delete the persisted job with the sdpecified uuid.
// This method is equivalent to deleteJobs(new JobUuidSelector(uuid))
public boolean deleteJob(String uuid) throws Exception
// Delete the persisted jobs that match the provided job selector
public List deleteJobs(JobSelector selector) throws Exception
// Retrieve and rebuild the persisted job with the specified uuid
// This method is equivalent to retrieveJob(uuid, false)
public JPPFJob retrieveJob(String uuid) throws Exception
// Get the description of the job with the specified uuid.
// This method retrieves the job's uuid, name, number of tasks, SLA and metadata
public JPPFDistributedJob getJobDescription(String uuid) throws Exception
// Determines whether the job has completed and all execution results are available
public boolean isJobComplete(String uuid) throws Exception
}
Note that the constructor for this class takes a JMXDriverConnectionWrapper, so that it can establish a JMX connection
to the remote driver. A JMXDriverConnectionWrapper can be obtained in 2 ways:
- by creating it directly, for example:
JMXDriverConnectionWrapper jmx =
new JMXDriverConnectionWrapper("my.driver.com", 11198, false);
jmx.connectAndWait(3000L);
JPPFDriverJobPersistence jobPersistence = new JPPFDriverJobPersistence(jmx);
- or by getting it from a JPPFClient, for example:
JPPFClient client = new JPPFClient();
JMXDriverConnectionWrapper jmx =
client.awaitWorkingConnectionPool().awaitWorkingJMXConnection();
JPPFDriverJobPersistence jobPersistence = new JPPFDriverJobPersistence(jmx);
4.12.2.1 Listing the persisted jobs
This operation can be achieved with the listJobs() method of JPPFDriverJobPersistence. This method takes a job
selector and returns a list of the uuids of the persisted jobs that match the selector. If no persisted job matches, then the
returned list is empty. The uuids n the list can then be reused in other methods of JPPFDriverJobPersistence.
Example usage:
JPPFClient client = new JPPFClient();
// create the job persistence manager instance
JPPFDriverJobPersistence jobPersistence = new JPPFDriverJobPersistence(
client.awaitWorkingConnectionPool().awaitWorkingJMXConnection());
// list the uuids of all currently persisted jobs
List uuids = jobPersistence.listJobs(JobSelector.ALL_JOBS);
for (String uuid: uuids) {
// do something whith each job uuid
}
4.12.2.2 Retrieving a job
To retrieve a job from a persistence store, you use the retrieveJob() method of JPPFDriverJobPersistence. This
method takes a job uuid and returns a JPPFJob that can be used as if it had been created directly on the client side. For
instance, you can resubmit it if it still has unexecuted tasks, or process its results if it has completed.
Here is an example usage:
JPPFClient client = new JPPFClient();
JPPFDriverJobPersistence jobPersistence = ...;
// list the uuids of all currently persisted jobs
List uuids = jobPersistence.listJobs(JobSelector.ALL_JOBS);
// retrieve all the persisted jobs and resubmit those that haven't completed yet
LinkedList jobs = new LinkedList<>();
for (String uuid: uuids) {
JPPFJob job = jobPersistence.retrieveJob(uuid);
jobs.add(job);
// delete the job from the store, it will be stored again if resubmitted
jobPersistence.deleteJob(uuid);
// if the job has unexecuted tasks, resubmit it
if (job.unexecutedTaskCount() > 0) {
jobs.addLast(job);
job.setBlocking(false); // ensure asynchronous submission
client.submitJob(job);
} else {
jobs.addFirst(job);
}
}
// process the results of all jobs
for (JPPFJob job: jobs) {
// if the job has already completed, this method returns immediately
List> results = job.awaitResults();
// ... process the results ...
}
4.12.2.3 Deleting one or more jobs
To delete one or more jobs from the persistence store, you can use either:
– the deleteJob(String) method to delete a single job at a time. This method takes a job uuid and returns true if a
job with this uuid was found and effectively deleted, false otherwise.
– or the deleteJobs(JobSelector) method. This method takes a JobSelector and returns a list of the uuids of the
jobs that were found and effectively deleted.
The following example deletes all persisted jobs using deleteJob(String) in a loop:
JPPFDriverJobPersistence jobPersistence = ...;
// list all currently persisted jobs
List uuids = jobPersistence.listJobs(JobSelector.ALL_JOBS);
// delete all the persisted jobs
for (String uuid: uuids) {
// delete the job from the store and check success
if (jobPersistence.deleteJob(uuid)) {
System.out.println("sucessfully deleted job with uuid = " + uuid);
}
}
This example performs exactly the same thing using deleteJobs(JobSelector) without a loop:
JPPFDriverJobPersistence jobPersistence = ...;
// delete all persisted jobs
List deletedUuids = jobPersistence.deleteJobs(JobSelector.ALL_JOBS);
System.out.println("sucessfully deleted jobs with uuids = " + deletedUuids);
4.12.2.4 Getting information on the jobs
JPPFDriverJobPersistence provides 2 methods to obtain information on a persisted job:
– isJobComlete() determines whether a job has completed its execution. It takes a job uuid as input and returns true
if the job has completed, false otherwise
– getJobDescription() provides detailed information on the job name, number of tasks, SLA and metadata. It takes a
job uuid and returns an instance of JPPFDistributedJob which encapsulates the details.
The following example illustrates a possible usage of these methods:
JPPFDriverJobPersistence jobPersistence = ...;
// retrieve all the jobs submitted by "john.doe"
String script = "'john.doe'.equals(jppfJob.getMetadata().getParameter('submitter'));";
JobSelector selector = new ScriptedJobSelector("javascript", script);
List uuids = jobPersistence.listJobs(selector);
for (String uuid: uuids) {
if (jobPersistence.isJobComplete(uuid)) {
// ... process completed job ...
} else {
JPPFDistributedJob jobDesc = jobPersistence.getJobDescription(uuid);
// ... do something with the job description ...
}
}
4.12.3 Configuring jobs persistence in the driver
A job persistence facility is essentially a pluggable implementation of the JobPersistence interface. A driver supports a
single persistence implementation at a time, configured through the "jppf.job.persistence" configuration property:
jppf.job.persistence = param1 ... paramN
where:
– "implementation class name" is the fully qualified class name of the JobPersistence implementation
– "param1 ... paramN" are optional string parameters used by the persistence implementation
For example, to configure the default file persistence with a root directory named "persistence" in the driver's working
directory, we would configure the following:
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence persistence
If no job persistence is configured, the driver will default to the default file persistence with a root directory "persistence",
exactly as in the example above.
4.12.4 Built-in persistence implementations
4.12.4.1 Default file persistence
The default file persistence is a file-based persistent store for jobs. The corresponding implementation class is
DefaultFilePersistence.
The store's structure is made of a root directory, under which there is one directory per job, named after the job's uuid.
Each job directory contains:
– a file named "header.data" for the job header
– a file named "data_provider.data" for the job's data_provider
– a file named "task-i.data" for each task i of the job, where i represents the position of the task in the job
– a file "result-i.data" for each task result i received from a node, where i represents the position of the task in the job
For example, if we define the file persistence with a root directory named "persistence" in the driver's working directory:
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence persistence
Let's say we submitted a job with two tasks which ran to completion, with a uuid of "my_job_uuid". The persistence store
structure would then look like this:
JPPF-6.0-driver
|_persistence
|_my_job_uuid
|_header.data
|_data_provider.data
|_task-0.data
|_task-1.data
|_result-0.data
|_result-1.data
Important note: when a job element (header, data provider, task or result) is stored, its data is first put into a temporary
file with the ".tmp" extension. When the data is fully stored in the file, and only then, the file is renamed with a ".data"
extension. This avoids ending up with incomplete or corrupted files that would prevent JPPF from restoring the job.
The built-in file persistence is configured as:
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence
where root_dir can be either an absolute file path, or a path relative to the driver's working directory. If it is omitted, it
defaults to "persistence".
Tip: you may also use system properties substitutions in your configuration to specify common paths. For instance, to
point the file persistence to a root directory named "jppf_jobs" in the current user's home directory, we could write:
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultFilePersistence \
${sys.user.home}/jppf_jobs
4.12.4.2 Default database persistence
The default database persistence is a job persistence implementation which stores jobs in a single database table. Its
corresponding implementation class is DefaultDatabasePersistence, and it is configured as follows:
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence \
where:
– "table_name" is the name of the table in which jobs are stored
– "datasource_name" is the name of a datasource configured via the database services facitlity
If both table_name and datasource_name are omitted, they will default to "JOB_PERSISTENCE" and "job_persistence",
respectively. If datasource_name is omitted, it will default to "job_persistence".
The following is an example configuration with a table named "JPPF_JOBS" and a datasource named "jobDS":
# persistence definition
jppf.job.persistence = org.jppf.job.persistence.impl.DefaultDatabasePersistence \
JPPF_JOBS jobDS
# datsource definition
jppf.datasource.jobs.name = jobDS
jppf.datasource.jobs.driverClassName = com.mysql.jdbc.Driver
jppf.datasource.jobs.jdbcUrl = jdbc:mysql://localhost:3306/testjppf
jppf.datasource.jobs.username = testjppf
jppf.datasource.jobs.password = testjppf
jppf.datasource.jobs.minimumIdle = 5
jppf.datasource.jobs.maximumPoolSize = 10
jppf.datasource.jobs.connectionTimeout = 30000
jppf.datasource.jobs.idleTimeout = 600000
The table structure is as follows:
CREATE TABLE (
UUID varchar(250) NOT NULL,
TYPE varchar(20) NOT NULL,
POSITION int NOT NULL,
CONTENT blob NOT NULL,
PRIMARY KEY (UUID, TYPE, POSITION)
);
Where:
– the UUID column represents the job uuid
– the TYPE column represents the type of object, taken from the PersistenceObjectType enum
– the POSITION column represents the object's position in the job, if TYPE is 'task' or 'task_result', otherwise -1
– the CONTENT column represents the serialized job element
Very important: the table definition should be adjusted depending on the database you are using. For instance, in
MySQL the BLOB type has a size limit of 64 KB, thus storing job elements larger than this size will always fail. In this
use case, the MEDIUMBLOB or LONGBLOB type should be used instead.
If the table does not exist, JPPF will attempt to create it. If this fails for any reason, for instance if the database user does
not have sufficient privileges, then persistence will be disabled.
It is possible to specify the path to the file that contains the DDL statement(s) to create the table, like so:
# path to the file or resource containng the DDL statements to create the table
jppf.job.persistence.ddl.location =
Here, is the path to either a file in the file system or a resource in the class path. The file system is always
looked up first and if no file is found, then JPPF looks up in the driver's classpath. The default value for this property is
"org/jppf/job/persistence/impl/job_persistence.sql", which points to a file in the classpath which can be found in the jppfcommon-x.y.z.jar file.
4.12.4.3 Asynchronous persistence (write-behind)
Asynchronous persistence is an asynchronous wrapper for any other job persistence implementation. It delegates the
persistence operations to this other implementation, and executes the delegated operations asynchronously via a pool of
threads. The corresponding implementation class is AsynchronousPersistence.
The methods of JobPersistence that do not return a result (void return type) are non-blocking and return immediately. All
other methods will block until the delegated operation is executed and its result is available. In particular, all operations
that store job data are executed some time in the future, which makes this implementation an effective "write-behind"
facitlity.
Asynchronous persistence can be configured as follows:
# shorten the configuration value for clarity
wrapper = org.jppf.job.persistence.impl.AsynchronousPersistence
# asynchronous persistence with a specified thread pool size
jppf.job.persistence = ${wrapper} ...
Where:
– "pool_size" is the size of the pool of threads used to execute the delegated operations. It can be omitted, in which case
it defaults to 1 (single-threaded).
– " ... " is the configuration of the delegate persistence implementation
Here is an example configuration for an asynchronous database persistence:
pkg = org.jppf.job.persistence.impl
# asynchronous database persistence with pool of 4 threads,
# a table named 'JPPF_JOBS' and datasource named 'JobDS'
jppf.job.persistence = ${pkg}.AsynchronousPersistence 4 \
${pkg}.DefaultDatabasePersistence JPPF_JOBS JobDS
Performance implications: the main goal of the asynchronous persistence is to minimize the impact of persistence on
performance, at the risk of a greater data loss in case of a driver crash. In scenarios where jobs are submitted and
executed faster than they are persisted, they will tend to accumulate in the thread pool's queue, with a risk of an out of
memory condition if the excessive load persists for too long.
To mitigate this possible issue, the asynchronous persistence monitors the heap usage. When heap usage reaches a
given threshold, it stops asynchronous operations and delegates directly to the underlying persistence implementation
instead, until the heap usage drops back under the threshold.
The heap usage threshold is the ratio used_heap / max_heap expressed as a percentage. It has a default value of
70% and can be set with the following configuration property:
jppf.job.persistence.memory.threshold = 60.0
4.12.4.4 Cacheable persistence
The cacheable persistence is a caching wrapper for any other job persistence implementation, whose corresponding
implementation class is CacheablePersistence.
The cached artifacts are those handled by the load() and store() methods, that is, job headers, data providers, tasks
and task results. The cache is an LRU cache of soft references to the artifacts. It guarantees that all its entries will be
garbage-collected before an out of memory error is raised. Additionally the cache has a capacity that can be specified in
the configuration and which defaults to 1024.
This cacheable persistence is configured as follows:
# shorten the configuration value for clarity
wrapper = org.jppf.job.persistence.impl.CacheablePersistence
# cacheable persistence with a specified capacity
jppf.job.persistence = ${wrapper} ...
Where:
– "capacity" is the capacity of the cache, that is, the maximum number of entries it can hold at any time. If omitted, it
defaults to 1024.
– " ... " is the configuration of the delegate persistence implementation
Here is a concrete example wrapping a default database persistence:
# shortcut for the package name
pkg = org.jppf.job.persistence.impl
# cacheable database persistence with a capacity of 10000,
# a table named 'JPPF_JOBS and datasource named 'JobDS'
jppf.job.persistence = ${pkg}.CacheablePersistence 10000 \
${pkg}.DefaultDatabasePersistence JPPF_JOBS JobDS
Note: since the job persistence faciltiy, by its nature and design, performs mostly "write" operations, you will generally
see little or no benefit to using the cacheable persistence wrapper. You may see significant performance gains
essentially in situations where the persisted jobs are accessed multiple times by the client-side management facillity.
Tip: it is possible to combine cacheable persistence with asynchronous persistence to wrap any concrete persistence
implementation. This is done by simply concatenating the class names and related arguments in the configuration, e.g.:
# shortcuts for package name and persistence implementations
pkg = org.jppf.job.persistence.impl
cacheable = ${pkg}.CacheablePersistence 1024
async = ${pkg}.AsynchronousPersistence 8
db = ${pkg}.DefaultDatabasePersistence JPPF_JOBS jobDS
# combine them to configure a cacheable asynchronous database persistence
jppf.job.persistence = ${cacheable} ${async} ${db}
4.12.5 Custom persistence implementations
Reference: custom implmentations are fully detailed in a dedicated section of the customization chapter.
4.12.6 Class loading and classpath considerations
In a scenario where you want a job to be automatically resubmitted by the persistence facility, after a driver restart and
without any client connected, the nodes to which the tasks of the job are dispatched will need to be able to deserialize
these tasks and then execute them. For this, they will need to load all the classes required by these tasks, otherwise the
execution will fail. Normally, these classes would be downloaded from the client via the driver, however that is not
possible here, since there is no client.
To ensure that these classes can be loaded, they must be in a place acessible from either the nodes or the driver. Our
recommendation is to put the corresponding jar files and class folders in the driver's classpath, to ensure all the nodes
can access them.
Reference: for details on how class loading works in JPPF, please read the class loading documentation.
Similarly, when restoring jobs on the client side with the JPPFDriverJobPersistence facility, you must ensure that all
required classes are available in the client classpath, to allow job elements to be deserialized properly.
4.12.7 Jobs persistence in multi-server topologies
In topologies that include multiple drivers, wether they communicate with each other and/or the JPPF client is connected
to one or more of them at once, there are scenarios that require special handling and care in order to guarantee the
integrity of the jobs in the persistence store.
4.12.7.1 Only the first peer persists the job
Consider a toplogy with 2 drivers communicating each other. When a JPPF client submits a job to driver 1, and driver 1
delegates all or a part of the job to driver 2, then only driver 1 will persist the job. This simplifies the handling of
persistence, avoids redundant persistence operations and generally increses performance.
4.12.7.2 Submitting a job via multiple driver connections:
The client-side load balancer, combined with the job's client SLA maxChannels attribute, allows JPPF clients to submit
jobs via multiple connections in parallel. We distinguish 2 cases with regards to jobs persistence:
1) All connections point to the same driver. In this case no special handling is needed, because the same driver also
means the same persistence store.
2) The connections point to 2 or more separate drivers. In this scenario, two conditions must be met:
– all the drivers must point to the same persistence store
– the persistence store must provide some sort of transactionality or locking mechanism to protect against integrity
constraint violations. In effect, some elements of the job might be stored multiple times in parallel by multiple drivers and
the store must be protected against possible corruption. Relational databases will generally provide the transactional
capabilities to achieve this. For instance the built-in database persistence does, but the built-in file persistence does not.
4.13 JPPF Executor Services
4.13.1 Basic usage
JPPF 2.2 introduced a new API, that serves as an ExecutorService facade to the JPPF client API. This API consists in
a simple class: JPPFExecutorService, implementing the interface java.util.concurrent.ExecutorService.
A JPPFExecutorService is obtained via its constructor, to which a JPPFClient must be passed:
JPPFClient jppfClient = new JPPFClient();
ExecutorService executor = new JPPFExecutorService(jppfClient);
The behavior of the resulting executor will depend largely on the configuration of the JPPFClient and on which
ExecutorService method you invoke to submit tasks. In effect, each time you invoke an invokeAll(...),
invokeAny(...), submit(...) or execute(...) method of the executor, a new JPPFJob will be created and sent
for execution on the grid. This means that, if the executor method you invoke only takes a single task, then a job with only
one task will be sent to the JPPF server.
Here is an example use:
JPPFClient jppfClient = new JPPFClient();
ExecutorService executor = new JPPFExecutorService(jppfClient);
try {
// submit a single task
Runnable myTask = new MyRunnable(0);
Future> future = executor.submit(myTask);
// wait for the results
future.get();
// process the results
...
// submit a list of tasks
List myTaskList = new ArrayList;
for (int i=0; i<10; i++) myTaskList.add(new MyRunnable(i));
List> futureList = executor.invokeAll(myTaskList);
// wait for the results
for (Future> future: futureList) future.get();
// process the results for the list of tasks
...
} finally {
// clean up after use
executor.shutdown();
jppfClient.close();
}
// !!! it is important that this task is Serializable !!!
public static class MyRunnable implements Runnable, Serializable {
private int id = 0;
public MyRunnable(int id) {
this.id = id;
}
}
public void run() {
System.out.println(“Running task id ” + id);
}
4.13.2 Batch modes
The executor's behavior can be modified by using one of the batch modes of the JPPFExecutorService. By batch
mode, we mean the ability to group tasks into batches, in several different ways. This enables tasks to be sent together,
even if they are submitted individually, and allows them to benefit from the parallel features inherent to JPPF. This will
also dramatically improve the throughput of individual tasks sent via an executor service.
Using a batch size: specifying a batch size via the method JPPFExecutorService.setBatchSize(int limit)
causes the executor to only send tasks when at least that number of tasks have been submitted. When using this mode,
you must be cautious as to how many tasks you send via the executor: if you send less than the batch size, these tasks
will remain pending and un-executed. Sometimes, the executor will send more than the specified number of tasks in the
same batch: this will happen in the case where one of the JPPFExecutorService.invokeXXX() method is called with
n tasks, such that current batch size + n > limit. The behavior is to send all tasks included in the invokeXXX() call
together.
Here is an example:
JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
// the executor will send jobs with at least 5 tasks each
executor.setBatchSize(5);
List> futures = new ArrayList>();
// we submit 10 = 2 * 5 tasks, this will cause the client to send 2 jobs
for (int i=0; i<10; i++) futures.add(executor.submit(new MyTask(i)));
for (Future> f: futures) f.get();
Using a batch timeout: this is done via the method JPPFExecutorService.setBatchTimeout(long timeout)
and causes the executor to send the tasks at regular intervals, specified as the timeout. The timeout value is expressed in
milliseconds. Once the timeout has expired, the counter is reset to zero. If no task has been submitted between two
timeout expirations, then nothing happens.
Example:
JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
// the executor will send a job every second (if any task is submitted)
executor.setBatchTimeout(1000L);
List> futures = new ArrayList>();
// we submit 5 tasks
for (int i=0; i<5; i++) futures.add(executor.submit(new MyTask(i)));
// we wait 1.5 second, during that time a job with 5 tasks will be submitted
Thread.sleep(1500L);
// we submit 6 more tasks, they will be sent in a different job
for (int i=5; i<11; i++) futures.add(executor.submit(new MyTask(i)));
// here we get the results for tasks sent in 2 different jobs!
for (Future> f: futures) f.get();
Using both batch size and timeout: it is possible to use a combination of batch size and timeout. In this case, a job will
be sent whenever the batch limit is reached or the timeout expires, whichever happens first. In any case, the timeout
counter will be reset each time a job is sent. Using a timeout is also an efficient way to deal with the possible blocking
behavior of the batch size mode. In this case, just use a timeout that is sufficiently large for your needs.
Example:
JPPFExecutorService executor = new JPPFExecutorService(jppfClient);
executor.setBatchTimeout(1000L);
executor.setBatchSize(5);
List> futures = new ArrayList>();
// we submit 3 tasks
for (int i=0; i<3; i++) futures.add(executor.submit(new MyTask(i)));
// we wait 1.5 second, during that time a job with 3 tasks will be submitted,
// even though the batch size is set to 5
Thread.sleep(1500L);
for (Future> f: futures) f.get();
4.13.3 Configuring jobs and tasks
There is a limitation in the JPPFExecutorService, in that if you use only the ExecutorService interface which it
extends, it does not provide a way to use JPPF-specific features, such as job SLA, metadata or persistence, or task
timeout, onTimeout() and onCancel().
To overcome this limitation without breaking the semantics of ExecutorSevice, JPPFExecutorService provides a
way to specify the configuration of the jobs and tasks that will be submitted subsequently.
This can be done via the ExecutorServiceConfiguration interface, which can be accessed from a JPPFExecutorService
instance via the following accessor methods:
// Get the configuration for this executor service
public ExecutorServiceConfiguration getConfiguration();
// Reset the configuration for this executor service to a blank state
public ExecutorServiceConfiguration resetConfiguration();
ExecutorServiceConfiguration provides the following API:
// Get the configuration to use for the jobs submitted by the executor service
JobConfiguration getJobConfiguration();
// Get the configuration to use for the tasks submitted by the executor service
TaskConfiguration getTaskConfiguration();
4.13.3.1 Job configuration
The JobConfiguration interface is defined as follows:
public interface JobConfiguration {
// Get the service level agreement between the jobs and the server
JobSLA getSLA();
// Get the service level agreement between the jobs and the client
JobClientSLA getClientSLA();
// Get the user-defined metadata associated with the jobs
JobMetadata getMetadata();
// Get/set the persistence manager that enables saving and restoring the jobs state
JobPersistence getPersistenceManager();
void setPersistenceManager(final JobPersistence persistenceManager);
// Get/set the job's data provider
DataProvider getDataProvider();
void setDataProvider(DataProvider dataProvider);
// Add or remove a listener to/from the list of job listeners
void addJobListener(JobListener listener);
void removeJobListener(JobListener listener);
// get all the class loaders added to this job configuration
List getClassLoaders();
}
As we can see, this provides a way to set the properties normally available to JPPFJob instances, even though the jobs
submiited by a JPPFExecutorService are not visible. Any change to the JobConfiguration will apply to the next
job that will be submitted by the executor and all subsequent jobs.
Here is an example usage:
JPPFExecutorService executor = ...;
// get the executor ocnfiguration
ExecutorServiceConfiguration config = executor.getConfiguration();
// get the job configuration
JobConfiguration jobConfig = config.getJobConfiguration();
// set all jobs to expire after 5 seconds
jobConfig.getSLA().setJobExpirationSchedule(new JPPFSchedule(5000L));
// add a class loader that cnanot be computed from the tasks
jobConfig.getClassLoaders().add(myClassLoader);
4.13.3.2 Task configuration
The TaskConfiguration interface can be used to set JPPF-specific properties onto executor service tasks that do not
implement Task. It is defined as follows:
public interface TaskConfiguration {
// Get/set the delegate for the onCancel() method
JPPFTaskCallback getOnCancelCallback();
void setOnCancelCallback(final JPPFTaskCallback cancelCallback);
// Get/set the delegate for the onTimeout() method
JPPFTaskCallback getOnTimeoutCallback();
void setOnTimeoutCallback(final JPPFTaskCallback timeoutCallback);
}
// Get/set the task timeout schedule
JPPFSchedule getTimeoutSchedule();
void setTimeoutSchedule(final JPPFSchedule timeoutSchedule);
This API introduces the concept of a callback delegate, which is used in lieu of the “standard” JPPFTask callback
methods, Task.onCancel() and Task.onTimeout(). This is done by providing a subclass of JPPFTaskCallback, which is
defined as follows:
public abstract class JPPFTaskCallback implements Runnable, Serializable {
// Get the task this callback is associated with
public final Task getTask();
}
Here is a task configuration usage example:
JPPFExecutorService executor = ...;
// get the executor configuration
ExecutorServiceConfiguration config = executor.getConfiguration();
// get the task configuration
TaskConfiguration taskConfig = config.getTaskConfiguration();
// set the task to timeout after 5 seconds
taskConfig.setTimeoutSchedule(new JPPFSchedule(5000L));
// set the onTimeout() callback
taskConfig.setOnTimeoutCallback(new MyTaskCallback());
// A callback that sets a timeout message as the task result
static class MyTaskCallback extends JPPFTaskCallback {
@Override
public void run() {
getTask().setResult("this task has timed out");
}
}
4.13.4 JPPFCompletionService
The JDK package java.util.concurrent provides the interface CompletionService, which represents “a service that
decouples the production of new asynchronous tasks from the consumption of the results of completed tasks ”. The JDK
also provides a concrete implementation with the class ExecutorCompletionService. Unfortunately, this class does not
work with a JPPFExecutorService, as it was not designed with distributed execution in mind.
As a convenience, the JPPF API provides a specific implementation of CompletionService with the class
JPPFCompletionService, which respects the contract and semantics defined by the CompletionService interface and
which can be used as follows:
JPPFExecutorService executor = ...;
JPPFCompletionService completion service =
new JPPFCompletionService(executor);
MyCallable task = new MyCallable();
Future future = completionService.submit(task);
// ... later on ...
// block until a result is available
future = completionService.take();
String result = future.get();
4.14 Grid topology monitoring
As of JPPF v5.0, the package org.jppf.client.monitoring.topology provides an API to explore and monitor the
topology of a JPPF grid, such as discovered from a JPPF client's perspective. This API is used by the administration and
monitoring console, and allows to explore the topology as well as subscribe to notifications of the changes that occur,
such as new server being brought online, nodes started or terminated, etc.
4.14.1 Building and exploring the topology
The entry point for this API is the TopologyManager class. A TopologyManager uses a JPPFClient to discover its
connections to one or more drivers, and queries these drivers via the management API to obtain information about their
attached JPPF nodes and other peer drivers they are connected to. All this information is used to build a model of the
JPPF grid topology as a tree. For example, with a grid that has two drivers connected to each other, we would have a
representation like this:
Topology Manager
Driver A
Node 1
Node 2
Driver B
Peer B
Peer A
Node 3
Node 4
The dotted lines emphasize the fact that a peer is actually a "virtual" component which references a real driver. Peer
objects are here to allow representing the topology as a tree rather than as a graph, which makes their usage a lot easier.
You may notice that this is very similar to the "toplogy tree" view of the administration console:
This is not a coincidence: the adminstration console indeed uses its own instance of TopologyManager and the tree view
is a direct mapping of the topology to a tree-like swing component.
The model representing the components of the JPPF toplogy is a class hierarchy whose base super class is the abtract
class AbstractTopologyComponent, which provides an API to navigate within the tree strutcture, along with ways to
identify each component and methods to retrieve information that are common to drivers and nodes.
The object model is as follows:
AbstractComponent
AbstractTopologyComponent
TopologyDriver
TopologyNode
TopologyPeer
AbstractComponent provides the API to navigate a tree of elements uniquely identified with a uuid:
public abstract class AbstractComponent {
// Get the parent of this compponent
public synchronized E getParent() {
// Get the number of children of this component
public synchronized int getChildCount()
// Get the children of this component
public synchronized List getChildren()
// Get the child with the specified uuid
public synchronized E getChild(final String uuid)
// Get the uuid of this ocmponent
public synchronized String getUuid()
// Get a user-friendly representation of this topology component
public String getDisplayName()
}
AbstractTopologyComponent is defined as follows:
public abstract class AbstractTopologyComponent
extends AbstractComponent {
// Whether this object represents a driver
public boolean isDriver()
// Whether this object represents a node
public boolean isNode()
// Whether this object represents a peer driver
public boolean isPeer()
}
// Get
public
// Get
public
// Get
public
the object describing the health of a node or driver
HealthSnapshot getHealthSnapshot()
the management information for this component
JPPFManagementInfo getManagementInfo()
a user-friendly representation of this component
String getDisplayName()
TopologyDriver provides the following additional methods:
public class TopologyDriver extends AbstractTopologyComponent {
// Get the JMX connection wrapper
public JMXDriverConnectionWrapper getJmx()
// Get the driver connection
public JPPFClientConnection getConnection()
// Get a proxy to the MBean that forwards node management requests
public JPPFNodeForwardingMBean getForwarder()
// Get a proxy to the jobs monitoring MBean
public DriverJobManagementMBean getJobManager()
// Get a proxy the diagnostics MBean for this driver
public DiagnosticsMBean getDiagnostics()
// Get the nodes attached to this driver as TopologyNode objects
public List getNodes()
// Get the peers connected to this driver as TopologyPeer objects
public List getPeers()
// Get the nodes and peers connected to this driver
public List getNodesAndPeers()
}
TopologyNode provides two specialized method to query the node's state:
public class TopologyNode extends AbstractTopologyComponent {
// Get the object describing the current state of a node
public JPPFNodeState getNodeState()
// Get the number of slaves for a master node (node provisioning)
public int getNbSlaveNodes()
}
Whereas TopologyPeer does not provide any additional method, since its uuid is all that is needed.
The root of the tree is our TopologyManager, defined as follows:
public class TopologyManager implements ClientListener {
// Create a topology manager with a new JPPFClient
public TopologyManager()
// Create a toplogy manager with the specified JPPFClient
public TopologyManager(JPPFClient client)
// Get the JPPF client
public JPPFClient getJPPFClient()
// Get all drivers
public List getDrivers()
// Get all nodes
public List getNodes()
// Get all peers
public List getPeers()
// Get a driver from its uuid
public TopologyDriver getDriver(String uuid)
// Get a node from its uuid
public TopologyNode getNode(String uuid)
// Get a peer from its uuid
public TopologyPeer getPeer(String uuid)
// Get a node or peer from its uuid
public TopologyNode getNodeOrPeer(String uuid)
// Get the number of drivers
public int getDriverCount()
// Get the number of nodes
public int getNodeCount()
// Get the number of peers
public int getPeerCount()
// Get and set the node filter
public NodeSelector getNodeFilter()
public void setNodeFilter(NodeSelector selector)
// Get the nodes that are slaves of the specified master node, if any
public List getSlaveNodes(String masterNodeUuid)
}
According to this, the following code example can be used to visit the entire tree:
// this creates a new JPPFClient accessible with manager.getJPPFClient()
TopologyManager manager = new TopologyManager();
// iterate over the discovered drivers
for (TopologyDriver driver: manager.getDrivers()) {
// ... do something with the driver ...
// iterate of the nodes and peers for this driver
for (TopologyNode node: driver.getNodesAndPeers()) {
if (comp.isNode()) {
// ... do something with the node ...
} else { // if (comp.isPeer())
TopologyPeer peer = (TopologyPeer) node;
// retrieve the actual driver the peer refers to
TopologyDriver actualDriver = manager.getDriver(peer.getUuid());
// ... do something with the peer ...
}
}
}
A TopologyManager instance also automatically refreshes the states of the nodes, along with the JVM health snapshots of
the drivers and nodes. The interval between refreshes is determined via the value of the following configuration
properties:
# refresh interval in millis for the grid topology.
# this is the interval between 2 successive runs of the task that refreshes the
# topology via JMX requests; defaults to 1000
jppf.admin.refresh.interval.topology = 1000
# refresh interval for the JVM health snapshots; defaults to 1000
# this is the interval between 2 successive runs of the task that refreshes
# the JVM health snapshots via JMX requests
jppf.admin.refresh.interval.health = 1000
These values can be set both in a configuration file and programmatically, for instance:
TypedProperties config = JPPFConfiguration.getProperties();
// refresh the nodes states every 3 seconds
config.setLong("jppf.admin.refresh.interval.topology", 3000L);
// refresh the JMV health snapshots every 5 seconds
config.setLong("jppf.admin.refresh.interval.health", 5000L);
TopologyManager manager = new TopologyManager();
4.14.2 Receiving notifications of changes in the topology
It is possible to subscribe to topology change events emitted by a TopologyManager, using an implementation of the
TopologyListener interface as parameter of the related constructors and methods in TopologyManager:
public class TopologyManager implements ClientListener {
// Create a topology manager with a new JPPFClient
// and add the specified listeners immediately
public TopologyManager(TopologyListener...listeners)
// Create a topology manager with the specified JPPFClient
// and add the specified listeners immediately
public TopologyManager(JPPFClient client, TopologyListener...listeners)
}
// Add a topology change listener
public void addTopologyListener(TopologyListener listener)
// Remove a topology change listener
public void removeTopologyListener(TopologyListener listener)
TopologyListener is defined as follows:
public interface TopologyListener extends EventListener {
// Called when a driver is discovered
void driverAdded(TopologyEvent event);
// Called when a driver is terminated
void driverRemoved(TopologyEvent event);
// Called when the state of a driver has changed
void driverUpdated(TopologyEvent event);
// Called when a node is added to a driver
void nodeAdded(TopologyEvent event);
// Called when a node is removed from a driver
void nodeRemoved(TopologyEvent event);
// Called when the state of a node has changed
void nodeUpdated(TopologyEvent event);
}
As we can see, each notification is encapsulated in a TopologyEvent object:
public class TopologyEvent extends EventObject {
// Get the driver data
public TopologyDriver getDriver()
// Get the related node or peer
public TopologyNode getNodeOrPeer()
// Get the topology manager which emitted this event
public TopologyManager getTopologyManager()
}
Please note that, for the driverAdded(...), driverRemoved(...) and driverUpdated(...) notifications, the
corresponding event's getNodeOrPeer() method will always return null.
Additionally, if you do not wish to override all the methods of the TopologyListener interface, you can instead extend the
class TopologyListenerAdapter, which provides an empty implementation of each method.
Here is a listener implementation that prints topology changes to the console:
public class MyListener extends TopologyListenerAdapter {
@Override public void driverAdded(TopologyEvent e) {
System.out.printf("added driver %s%n", e.getDriver().getDisplayName());
}
@Override public void driverRemoved(TopologyEvent e) {
System.out.printf("removed driver %s%n", e.getDriver().getDisplayName());
}
@Override public void nodeAdded(TopologyEvent e) {
TopologyNode node = e.getNodeOrPeer();
System.out.printf("added %s %s to driver %s%n", nodeType(node),
node.getDisplayName(), e.getDriver().getDisplayName());
}
@Override public void nodeRemoved(TopologyEvent e) {
TopologyNode node = e.getNodeOrPeer();
System.out.printf("removed %s %s from driver %s%n", nodeType(node),
node.getDisplayName(), e.getDriver().getDisplayName());
}
}
private String nodeType(TopologyNode node) {
return node.isNode() ? "node" : "peer";
}
To subscribe this listener for topology changes:
TopologyManager manager = new TopologyManager();
manager.addTopologyListener(new MyListener());
or, in a single statement:
TopologyManager manager = new TopologyManager(new MyListener());
4.14.3 Node filtering
TopologyManager provides an API to filter the nodes in the grid topology, based on node selectors:
public class TopologyManager implements ClientListener {
// Get the node filter
public NodeSelector getNodeFilter()
// Set the node filter
public void setNodeFilter(NodeSelector selector)
}
To activate the node filtering, you only need to call setNodeFilter() with a non-null NodeSelector. Inversely, calling
setNodeFilter() with a null selector will deactivate the node filtering. Here is an example which filters out all slave
nodes in the topology:
TopologyManager manager = ...;
// a policy that filters out slave nodes
ExecutionPolicy noSlavePolicy = new Equal("jppf.node.provisioning.slave", false);
NodeSelector selector = new ExecutionPolicySelector(noSlavePolicy);
// activate node filtering with our policy
manager.setNodeFilter(selector);
When node filtering is active:
• only the nodes that match the selector will be available via the TopologyManager API
• events will be emitted only for the nodes that match the selector
The administration console also provides a UI to enter an execution policy in XML format to use as a filter:
4.15 Job monitoring API
In the same way that the topology monitoring API maintains a representation of the grid topology, the job monitoring API
maintains a representation of the jobs being processed within the grid, including which jobs are processed by each drivers
and which nodes the jobs are dispatched to.
4.15.1 Job monitor and jobs hierarchy
The class JobMonitor maintains a representation of the jobs as a hierarchy with 3 levels: driver / job / job dispatch, as
shown in this picture:
Job Monitor
Driver A
Job 1
Node 1
dispatch
Node 2
dispatch
Driver B
Job 2
Node 3
dispatch
Node 4
dispatch
Job 3
Node 5
dispatch
Node 6
dispatch
Job 4
Node 7
dispatch
Node 8
dispatch
Note that a JPPF job can be submitted to multiple drivers, either in parallel from the same client, or from one driver to
another in a multi-driver topology. JobMonitor provides an API to retrieve all the drivers a job is submitted to.
Each element of the jobs hierarchy has its own class, represented in this object model:
AbstractComponent
AbstractJobComponent
JobDriver
Job
JobDispatch
As we can see, job elements share a common super class with the toplogy elements: AbstractComponent, which
provides the base API to navigate a tree of elements identified by their uuid.
JobMonitor provides the following API to navigate the job hierarchy:
public class JobMonitor extends TopologyListenerAdapter {
// Get the driver with the specified uuid
public JobDriver getJobDriver(String driverUuid)
// Get the drivers monitored by this job monitor
public List getJobDrivers()
// Get the drivers to which a job was submitted
public List getDriversForJob(String jobUuid)
// Get the dispatches of the specified job accrosss the entire topology
// This returns job dispatches with nodes that may be connected to different servers
public List getAllJobDispatches(String jobUuid)
}
The class JobDriver is essentially a wrapper for a TopologyDriver element, which represents a topology hierarchy that is
orthogonal to the jobs hierarchy. It is defined as follows:
public class JobDriver extends AbstractJobComponent {
// Get the proxy to the driver MBean that manages and monitors jobs
public DriverJobManagementMBean getJobManager()
// Get the associated driver from the topology manager
public TopologyDriver getTopologyDriver()
// Get a job handled by this driver from its uuid
public Job getJob(final String jobUuid)
}
// Get the list of jobs handled by this driver
public List getJobs()
The Job class wraps a JobInformation object and provides information on the latest known state of a job, as seen by the
driver it is submitted to. It is defined as follows:
public class Job extends AbstractJobComponent {
// Get the information on the job
public JobInformation getJobInformation()
// Get the driver that holds this job
public JobDriver getJobDriver()
// Get the dispatch with the specified node uuid for this job
public JobDispatch getJobDispatch(String nodeUuid)
// Get the job dispatches for this job
public List getJobDispatches()
}
JobDispatch represents a subset of a job that was sent to a node for execution and is the association of a JobInformation
object with a TopologyNode:
public class JobDispatch extends AbstractJobComponent {
// Get the information on the node for ths job dispatch
public TopologyNode getNode()
// Get the information on the job
public synchronized JobInformation getJobInformation()
// Get the job to which this dispatch belongs
public Job getJob()
}
4.15.2 Creating and configuring a job monitor
4.15.2.1 Constructors
To create and initialize a JobMonitor, you have to call one of its two constructors:
public class JobMonitor extends TopologyListenerAdapter {
// Create this jo mManager with the specified topology manager in
// IMMEDIATE_NOTFICATIONS mode
public JobMonitor(TopologyManager topologyManager, JobMonitoringListener...listeners)
}
// Create this job manager with the specified topology manager and event mode
public JobMonitor(JobMonitorUpdateMode updateMode, long period,
TopologyManager topologyManager, JobMonitoringListener...listeners)
Examples:
JPPFClient client = new JPPFClient();
TopologyManager tManager = new TopologyManager(client);
JobMonitoringListener listener1 = new JobMonitoringListenerAdapter() { ... };
JobMonitoringListener listener2 = new JobMonitoringListenerAdapter() { ... };
JobMonitoringListener[] listenerArray = { listener1, listener2 };
JobMonitor monitor;
monitor = new JobMonitor(tManager);
monitor = new JobMonitor(tManager, listener1);
monitor = new JobMonitor(tManager, listenerArray);
monitor = new JobMonitor(UpdateMode.IMMEDIATE_NOTIFICATIONS, -1L, tManager);
monitor = new JobMonitor(UpdateMode.POLLING, 1000L, tManager, listener1, listener2);
Remarks:
1) A call to:
new JobMonitor(topologyManager, listenerArray)
is equivalent to:
new JobMonitor(UpdateMode.IMMEDIATE_NOTIFICATIONS , -1L, topologyManager, listenerArray)
2) The vararg parameter listeners is optional and needs not be specified if you do not wish to register a job monitor
listener at construction time.
3) As we can see, a JobMonitor requires a TopologyManager to work with, since it will use its TopologyDriver and
TopologyNode objects to construct the JobDriver and JobDispatch instances in the jobs hierarchy.
4.15.2.2 Update modes
As we have seen, a JobMonitor is given an update mode at construction time, whether implicitely or explicitely. The
update mode is one of the elements of the JobMonitorUpdateMode enum, defined as follows:
public enum JobMonitorUpdateMode {
// Updates are computed by polling job information from the drivers
// at regular intervals
POLLING,
// Updates are computed from JMX notifications and pushed immediately
// as job monitor events. This means one event for each jmx notification
IMMEDIATE_NOTIFICATIONS,
}
// Updates are computed from JMX notifications and published periodically as job
// monitor events. Notifications are merged/aggregated in the interval between
// publications
DEFERRED_NOTIFICATIONS
In POLLING and DEFERRED_NOTIFICATIONS modes, updates are published periodically as events, hence the need for
the period parameter in the constructor, which represents the interval between publications in milliseconds. There is
however a significatant difference between these two modes: in POLLING mode, the period represents the interval
between two JMX requests to the driver(s). Since this involves network communication, it can be a slow operattion and
the value for period should not be too small, otherwise the job monitor may not be able to cope. A period of 1000L (1
second) or more is recommended. In DEFERRED_NOTIFICATIONS mode, the information to publish is already present
in memory and the period represents the interval between two publications as events. Thus the period can be significantly
smaller than in POLLING mode. For instance, when displaying the updates in a desktop GUI, you might want to make
sure that the user's eyes will perceive them as continuous updates. For this you need to have at least 25 updates per
second, which means a period of 40 milliseconds or less.
In IMMEDIATE_NOTIFICATIONS mode, updates are immediately pushed as events, therefore no period parameter is
needed and the value used in the constructor is simply ignored.
To conclude, the purpose of the update modes is to provide options for the tradeoff between the accuracy of the job
updates and the ability for the application to cope with the generated workload. For instance, a GUI application may not
need to display all the updates, in which case the POLLING mode would be sufficient. If you need to receive all updates,
then IMMEDIATE_NOTIFICATIONS is the mode to use. Finally, please also note that POLLING is the mode which
generates the least network traffic.
4.15.3 Receiving job monitoring events
JobMonitor allows registering one or more listeners to job events, either from its constructors, as we have seen in the
previous section, or after construction time, with the following API:
public class JobMonitor extends TopologyListenerAdapter {
// Initialize with the specified topology manager and listeners
public JobMonitor(TopologyManager topologyManager, JobMonitoringListener...listeners)
// Initialize with the specified topology manager, event mode and listeners
public JobMonitor(UpdateMode updateMode, long period, TopologyManager topologyManager,
JobMonitoringListener...listeners)
// Add a listener to the events emitted by this job monitor
public void addJobMonitoringListener(JobMonitoringListener listener)
// Remove a listener to the events emitted by this job monitor
public void removeJobMonitoringListener(JobMonitoringListener listener)
}
A listener is an implementation of the JobMonitoringListener interface, which provides the following notification methods:
public interface JobMonitoringListener extends EventListener {
// Called when a new driver is added to the topology
void driverAdded(JobMonitoringEvent event);
// Called when a new driver is added to the topology
void driverRemoved(JobMonitoringEvent event);
// Called when a job is added to the driver queue
void jobAdded(JobMonitoringEvent event);
// Called when a job is removed from the driver queue
void jobRemoved(JobMonitoringEvent event);
// Called when the state a job has changed
void jobUpdated(JobMonitoringEvent event);
// Called when a job is dispatched to a node
void jobDispatchAdded(JobMonitoringEvent event);
// Called when a job dispatch returns from a node
void jobDispatchRemoved(JobMonitoringEvent event);
}
If you do not need to implement all the methods, you may instead extend the JobMonitoringListenerAdapter class, which
provides an empty implementation of each method in JobMonitoringListener.
These methods provide an input parameter of type JobMonitoringEvent:
public class JobMonitoringEvent extends EventObject {
// Get the job monitor which emitted this event
public JobMonitor getJobMonitor()
// Get the job driver from which this event originates
public JobDriver getJobDriver()
// Get the related job, if any
public Job getJob()
}
// Get the related job dispatch, if any
public JobDispatch getJobDispatch()
Note that getJob() and getJobDispatch() may return null, depending on the notification method called:
- getJob() will return null for the driverAdded() and driverRemoved() notifications
- getJobDispatch() only returns a value for the jobDispatchAdded() and jobDispatchRemoved() notifications.
As an example, the following code sample prints all the job monitoring events to the system console:
public class MyJobMonitoringListener implements JobMonitoringListener {
@Override public void driverAdded(JobMonitoringEvent event) {
print("driver ", event.getJobDriver(), " added");
}
@Override public void driverRemoved(JobMonitoringEvent event) {
print("driver ", event.getJobDriver(), " removed");
}
@Override public void jobAdded(JobMonitoringEvent event) {
print("job ", event.getJob(), " added to driver ", event.getJobDriver());
}
@Override public void jobRemoved(JobMonitoringEvent event) {
print("job ", event.getJob(), " removed from driver ", event.getJobDriver());
}
@Override public void jobUpdated(JobMonitoringEvent event) {
print("job ", event.getJob(), " updated");
}
@Override public void jobDispatchAdded(JobMonitoringEvent event) {
print("job ", event.getJob(), " dispatched to node ", event.getJobDispatch());
}
@Override public void jobDispatchRemoved(JobMonitoringEvent event) {
print("job ", event.getJob(), " returned from node ", event.getJobDispatch());
}
private void print(Object...args) {
StringBuilder sb = new StringBuilder();
for (Object o: args) {
if (o instanceof AbstractComponent)
sb.append(((AbstractComponent) o).getDisplayName());
else sb.append(o);
}
System.out.println(sb.toString());
}
}
TopologyManager tManager = ...;
// register the listener with the job monitor
JobMonitor monitor = new JobMonitor(tManager, new MyJobMonitoringListener());
// or, later on
monitor.addJobMonitoringListener(new MyJobMonitoringListener());
4.16 The JPPF statistics API
4.16.1 API description
The statistics in JPPF are handled with objects of type JPPFStatistics, which are a grouping of JPPFSnapshot objects,
each snapshot representing a value that is constantly monitored, for instanc the number of nodes connected to a server,
or the number of jobs in the server queue, etc.
JPPFSnapshot exposes the following API:
public interface JPPFSnapshot extends Serializable {
// Get the total cumulated sum of the values
double getTotal();
// Get the latest observed value
double getLatest();
// Get the smallest observed value
double getMin();
// Get the peak observed value
double getMax();
// Get the average value
double getAvg();
// Get the label for this snapshot
String getLabel();
// Get the count of values added to this snapshot
long getValueCount();
}
The label of a snapshot is expected to be unique and enables identifiying it within a JPPFStatistics object.
JPPF implements three different types of snapshots, each with a different semantics for the getLatest() method:
– CumulativeSnapshot: in this implementation, getLatest() is computed as the cumulated sum of all values added to
the snapshot. If values are only added, and not removed, then it will always return the same value as getTotal().
– NonCumulativeSnapshot: here, getLatest() is computed as the average of the latest set of values that were added,
or the latest value if only one was added.
– SingleValueSnapshot: in this implementation, only getTotal() is actually computed, all other methods return 0.
A JPPFStatistics object allows exploring the snapshots it contains, by exposing the following methods:
public class JPPFStatistics implements Serializable, Iterable {
// Get a snapshot specified by its label
public JPPFSnapshot getSnapshot(String label)
// Get all the snapshots in this object
public Collection getSnapshots()
// Get the snapshots in this object using the specified filter
public Collection getSnapshots(Filter filter)
@Override
public Iterator iterator()
}
// A filter interface for snapshots
public interface Filter {
// Determines whether the specified snapshot is accepted by this filter
boolean accept(JPPFSnapshot snapshot)
}
Note that, since it implements Iterable, a JPPFStatistics object can be used directly in a for loop:
JPPFStatistics stats = ...;
for (JPPFSnapshot snapshot: stats) {
System.out.println("got '" + snapshot.getLabel() + "' snapshot");
}
Currently, only the JPPF driver holds and maintains a JPPFStatistics instance. It can be obtained directly with:
JPPFStatistics stats = JPPFDriver.getInstance().getStatistics();
It can also be obtained remotely via the management APIs, as described in the section Management and monitoring >
Server management > Server-level management and monitoring > Server statistics of this documentation.
Additionally, the class JPPFStatisticsHelper holds a set of constants definitions for the labels all all the snapshots
currently used in JPPF, along with a number of utility methods to ease the use of statistics:
public class JPPFStatisticsHelper {
// Count of tasks dispatched to nodes
public static String TASK_DISPATCH = "task.dispatch";
// ... other constant definitions ...
// Determine wether the specified snapshot is a single value snapshot
public static boolean isSingleValue(JPPFSnapshot snapshot)
// Determine wether the specified snapshot is a cumulative snapshot
public static boolean isCumulative(JPPFSnapshot snapshot)
// Determine wether the specified snapshot is a non-cumulative snapshot
public static boolean isNonCumulative(JPPFSnapshot snapshot)
// Get the translation of the label of a snapshot in the current locale
public static String getLocalizedLabel(JPPFSnapshot snapshot)
// Get the translation of the label of a snapshot in the specified locale
public static String getLocalizedLabel(JPPFSnapshot snapshot, Locale locale)
}
The getLocalizedLabel() methods provide a short, localized description of what the snapshot is.
Note: at this time, only English translations are available.
4.16.2 Statistics snapshots reference by type
The table below provides a list of the existing server statistics snapshots grouped by type. The snapshot names
correspond to the names of the constants defined in JPPFStatisticsHelper.
To obtain the corresponding JPPFSnapshot from a JPPFStatistics object, you can write code similar to the following:
JPPFStatistics stats = ...;
JPPFSnapshot snapshot = stats.getSnapshot(JPPFStatisticsHelper.NODE_EXECUTION);
Cumulative
Non cumulative
Single value
TASK_QUEUE_COUNT
JOB_COUNT
NODES
IDLE_NODES
CLIENTS
EXECUTION
NODE_EXECUTION
TRANSPORT_TIME
TASK_QUEUE_TIME
JOB_TIME
JOB_TASKS
TASK_DISPATCH
NODE_CLASS_REQUESTS_TIME
CLIENT_CLASS_REQUESTS_TIME
TASK_QUEUE_TOTAL
JOB_TOTAL
NODE_IN_TRAFFIC
NODE_OUT_TRAFFIC
CLIENT_IN_TRAFFIC
CLIENT_OUT_TRAFFIC
PEER_IN_TRAFFIC
PEER_OUT_TRAFFIC
UNIDENTIFIED_IN_TRAFFIC
UNIDENTIFIED_OUT_TRAFFIC
4.17 The Location API
4.17.1 Definition
This API allows developers to easily write data to, or read data from various sources: JVM heap, file system, URL or
Maven central artifacts. It is based on the Location interface , which provides the following methods:
public interface Location {
// Copy the content at this location to another location
Location copyTo(Location location);
// Obtain an input stream to read from this location
InputStream getInputStream();
// Obtain an output stream to write to this location
OutputStream getOutputStream():
// Get this location's path
T getPath();
// Get the size of the data this location points to
long size();
// Get the content at this location as an array of bytes
byte[] toByteArray() throws Exception;
}
4.17.2 Built-in implementations
4.17.2.1 FileLocation
FileLocation represents a path in the file system.
Usage examples:
// create a location pointing to the source file:
Location src = new FileLocation("/home/user/docs/some_file.txt");
// copy the source file to a new location and get the new location
Location dest = src.copyTo(new FileLocation("/home/user/other_file.txt"));
4.17.2.2 URLLocation
URLLocation can be used to get data to and from a URL, including HTTP and FTP URLs
Example:
// create a location pointing to a file on an FTP server
Location src = new URLLocation("ftp://user:password@ftp.host.org/path/myFile.txt");
// download the file from the FTP server to the local file system
Location dest = src.copyTo(new FileLocation("/home/user/destFile.txt"));
4.17.2.3 MavenCentralLocation
MavenCentralLocation is an extension of URLLocation which allows downloading Maven artifacts from the Maven central
repository. Contrary to other Location implementations, this one does not permit uploading data. To enforce this, its
getOutputStream() method will always throw an UnsupportedOperationException.
Examples:
// create a location that points to a Maven artifact with default "jar" packaging
Location jar = new MavenCentralLocation("org.jppf:jppf-client:5.2.9");
// copy the artifact to the file system
jar.copyTo(new FileLocation("lib/jppf-client-5.2.9.jar"));
// create a location that points to a Maven artifact with "war" packaging
Location war = new MavenCentralLocation("org.jppf:jppf-admin-web:6.0", "war");
// copy the artifact to the file system
jar.copyTo(new FileLocation("tomcat-7.0/webapps/JPPFWebAdmin.war"));
Note: instances of MavenCentralLocation only allow access to a single artifact and do not in any way handle Maven
transitive dependencies.
4.17.2.4 MemoryLocation
MemoryLocation represents a block of data in memory that can be copied from or sent to another location.
Example:
MyClass object = ...;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
// serialize the object
oos.writeObject(object);
oos.flush();
// store the serialized object in a memory location
Location dataLocation = new MemoryLocation(baos.toByteArray());
// copy the serialized object to a file
dataLocation.copyTo(new FileLocation("/home/user/store/object.ser"));
} catch (Exception e) {
e.printStackTrace();
}
4.17.3 Related reference
The location API is notably used to specify a classpath in a job SLA.
5 Configuration guide
A JPPF grid is composed of many distributed components interacting with each other, often in different environments.
While JPPF will work in most environments, the default behavior may not be appropriate or adapted to some situations.
Much of the behavior in JPPF components can thus be modified, fine-tuned or sometimes even disabled, via numerous
configuration properties. These properties apply to many mechanisms and behaviors in JPPF, including:
•
•
•
•
network communication
management and monitoring
performance / load-balancing
failover and recovery
Any configuration property has a default value that is used when the property is not specified, and which should work in
most environments. In practice, this means that JPPF can work without any explicitely specified configuration at all.
For a full list of the JPPF configuration properties, do not hesitate to read the chapter Appendix A: configuration
properties reference of this manual.
5.1 Configuration file specification and lookup
All JPPF components work with a set of configuration properties. The format of these propertiesis as specified in the Java
Properties class. To enable a JPPF component to retrieve thiese properties file, their source must be specified using one
of the two, mutually exclusive, system properties:
– jppf.config.plugin = class_name, where class_name is the fully qualified name of a class implementing either
the interface JPPFConfiguration.ConfigurationSource, or the interface JPPFConfiguration.ConfigurationSourceReader,
enabling a configuration source from any origin, such as a URL, a distributed file system, a remote storage facility, a
database, etc.
– jppf.config = path, where path is the location of the configuration file, either on the file system, or relative to the
JVM's classpath root. If this system property is not specified, JPPF will look for a default file named "jppf.properties" in
the current directory or in the classpath root.
Example use:
java -Djppf.config.plugin=my.own.Configuration ...
or
java -Djppf.config=my/folder/myFile.properties ...
The configuration file lookup mechanism is as follows:
1. if jppf.plugin.config is specified
a) instantiate an object of the specified class name and read the properties via the stream provided by this object's
getPropertyStream() or getPropertyReader() method, depending on which interface it implements.
b) if, for any reason, the stream cannot be obtained or reading the properties from it fails, go to 3.
2. else if jppf.config is specified
a) look for the file in the file system
b) if not found in the file system, look in the classpath
c) if not found in the classpath use default configuration values
3. if jppf.config is not specified
a) use default file "jppf.properties"
b) look for "jppf.properties" in the file system
c) if not found in the file system, look for it in the classpath
d) if not found in the classpath use default configuration values
A practical side effect of this mechanism is that it allows us to place a configuration file in the classpath, for instance
packaged in a jar file, and override it if needed with an external file, since the the file system is always looked up first.
5.2 Includes, substitutions and scripted values in the configuration
5.2.1 Includes
A JPPF configuration source, whether as a file or as a plugin, can include other configuration sources by adding one or
more “#!include” statements in the following format:
#!include source_type source_path
The possible values for source_type are “file”, “url” or “class”. For each of these values, source_path has a different
meaning:
- when source_type is “file”, source_path is the path to a file on the file_system or in the JVM's classpath. If the file
exists in both classpath and file system, the file system will have priority over the classpath. Relative paths are interpeted
as relative to the JVM's current user directory, as determined by System.getProperty(“user.dir”).
- when source_type is “url”, source_path is a URL pointing to a configuration file. It must be a URL such that the JVM
can open a stream from it.
- when source_type is “class”, source_path is the fully qualified class name of a configuration plugin, i.e. an
implementation of either JPPFConfiguration.ConfigurationSource or JPPFConfiguration.ConfigurationSourceReader.
Examples:
# a config file in the file system
#!include file /home/me/jppf/jppf.properties
# a config file in the classpath
#!include file META-INF/jppf.properties
# a config file obtained from a url
#!include url http://www.myhost.com/jppf/jppf.properties
# a config file obtained from a configuration plugin
#!include class myPackage.MyConfigurationSourceReader
Includes can be nested without any limit on the nesting level. Thus, you need to be careful not to introduce cycles in your
includes. If that happens, JPPF will catch the resulting StackOverflowException and display an error message in the
console output and in the log.
5.2.2 Substitutions in the values of configuration properties
The JPPF configuration can handle a syntax of the form “propertyName = prefix${otherPropertyName}suffix”, where
prefix and suffix are arbitrary strings and ${otherPropertyName} is a placeholder referencing another property, whose
value will be substituted to the placeholder. If you have experience with Apache Ant, this syntax is very similar to the way
Ant properties are used in a build script.
Let's take an example illustrating how this works. The following property definitions:
prop.1
prop.2
prop.3
prop.4
=
=
=
=
value1
${prop.1}
value3 ${prop.2}
value4 ${prop.2} + ${prop.3}
will be resolved into:
prop.1
prop.2
prop.3
prop.4
=
=
=
=
value1
value1
value3 value1
value4 value1 + value3 value1
Note 1: the order in which the properties are defined has no impact on the resolution of substitutions.
Note 2: substitutions are resolved after all includes are fully resolved and loaded.
Unresolved substitutions
A referenced property is unresolvable if, and only if, it refers to a property that is not defined or it is involved in a resolution
cycle. In this case, the value of the unresolved value will be the literal syntax in its initial definition, that is in the literal form
“${otherPropertyName}”. Let's illustrate this with examples.
In the following configuration:
prop.1 = ${prop.2}
the value of “prop.1” will remain “${prop.2}”, since the property “prop.2” is not defined.
In this configuration:
prop.1 = ${prop.2}
prop.2 = ${prop.3} ${prop.1}
prop.3 = value3
“prop.1” and “prop.2” introduce an unresolvable cycle, and only the reference to “prop.3” can be fully resolved. According
to this, the final values will be:
prop.1 = ${prop.2}
prop.2 = value3 ${prop.1}
prop.3 = value3
Environment variable substitutions
Any reference to a property name in the form “env.variableName” will be substituted with the value of the environment
variable whose name is “variableName”. For example, if the environment variable JAVA_HOME is defined as
“/opt/java/jdk1.7.0”, then the following configuration:
prop.1 = ${env.JAVA_HOME}/bin/java
will be resolved into:
prop.1 = /opt/java/jdk1.7.0/bin/java
If the value of a property refers to an undefined environment variable, then the reference will be replaced with the literal
syntax in its initial definition, that is in the literal form “${env.