Skip to content
This repository was archived by the owner on Apr 4, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.fs.Path;
import org.joda.time.format.DateTimeFormat;
Expand Down Expand Up @@ -69,7 +70,7 @@ public java.util.Properties build(Cluster cluster,

DateUtil.setTimeZone(entity.getTimezone().getID());
ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis()));
elProps.putAll(getInputProps(cluster));
elProps.putAll(getInputProps(cluster, suppliedProps));
elProps.putAll(getOutputProps());
elProps.putAll(evalProperties());
Properties buildProps = build(cluster, buildPath);
Expand Down Expand Up @@ -144,7 +145,7 @@ private Properties getOutputProps() throws FalconException {
return props;
}

private Properties getInputProps(Cluster clusterObj) throws FalconException {
private Properties getInputProps(Cluster clusterObj, Properties suppliedProps) throws FalconException {
Properties props = new Properties();

if (entity.getInputs() == null) {
Expand All @@ -168,47 +169,12 @@ private Properties getInputProps(Cluster clusterObj) throws FalconException {
falconInputNames.add(input.getName());
falconInputFeedStorageTypes.add(storage.getType().name());
String partition = input.getPartition();

String startTimeExp = input.getStart();
String endTimeExp = input.getEnd();
ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis()));
Date startTime = EXPRESSION_HELPER.evaluate(startTimeExp, Date.class);
Date endTime = EXPRESSION_HELPER.evaluate(endTimeExp, Date.class);

for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
continue;
}

List<Location> locations = FeedHelper.getLocations(cluster, feed);
for (Location loc : locations) {
if (loc.getType() != LocationType.DATA) {
continue;
}
List<String> paths = new ArrayList<>();
List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
startTime, endTime); // test when startTime and endTime are equal.
for (Date instanceTime : instanceTimes) {
String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
if (StringUtils.isNotBlank(partition)) {
if (!path.endsWith("/") && !partition.startsWith("/")) {
path = path + "/";
}
path = path + partition;
}
path = getStoragePath(path);
paths.add(path);
}
if (loc.getType() != LocationType.DATA) {
props.put(input.getName() + "." + loc.getType().toString().toLowerCase(),
StringUtils.join(paths, ","));
} else {
props.put(input.getName(), StringUtils.join(paths, ","));
}
falconInputPaths.add(StringUtils.join(paths, ","));
}
if (suppliedProps!= null
&& suppliedProps.containsKey(OozieUtils.FALCON_PROCESS_INPUT_PATHS + "." + input.getName())) {
String paths = suppliedProps.getProperty(OozieUtils.FALCON_PROCESS_INPUT_PATHS
+ "." + input.getName());
List<String> inPaths = getInPaths(paths, partition);
falconInputPaths.addAll(inPaths);
}
}
props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(falconInputFeeds, "#"));
Expand All @@ -219,4 +185,22 @@ private Properties getInputProps(Cluster clusterObj) throws FalconException {
return props;
}

private String addPartition(String path, String partition) {
if (StringUtils.isNotBlank(partition)) {
if (!path.endsWith("/") && !partition.startsWith("/")) {
path = path + "/";
}
path = path + partition;
}
return path;
}

private List getInPaths(String paths, String partition) {
List<String> inPaths = new ArrayList<>();
for (String path : paths.split(",")) {
path = addPartition(path, partition);
inPaths.add(getStoragePath(path));
}
return inPaths;
}
}
1 change: 1 addition & 0 deletions oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* Help methods relating to oozie configuration.
*/
public final class OozieUtils {
public static final String FALCON_PROCESS_INPUT_PATHS = "falcon.system.inpaths";
public static final JAXBContext WORKFLOW_JAXB_CONTEXT;
public static final JAXBContext ACTION_JAXB_CONTEXT;
public static final JAXBContext COORD_JAXB_CONTEXT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.falcon.notification.service.impl.DataAvailabilityService;
import org.apache.falcon.predicate.Predicate;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.engine.DAGEngine;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
Expand Down Expand Up @@ -122,67 +123,119 @@ public void registerForNotifications(boolean isResume) throws FalconException {
return;
}
for (Input input : process.getInputs().getInputs()) {
// Register for notification for every required input
if (input.isOptional()) {
continue;
}
Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
String startTimeExp = input.getStart();
String endTimeExp = input.getEnd();
SchedulerUtil.validateELExpType(startTimeExp, endTimeExp, input.getName());
DateTime processInstanceTime = getInstanceTime();
expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis()));

Date startTime = expressionHelper.evaluate(startTimeExp, Date.class);
Date endTime = expressionHelper.evaluate(endTimeExp, Date.class);
Date startTime = null, endTime = null;
SchedulerUtil.EXPTYPE exptype = SchedulerUtil.getExpType(startTimeExp);
if (exptype == SchedulerUtil.EXPTYPE.ABSOLUTE) {
expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis()));
startTime = expressionHelper.evaluate(startTimeExp, Date.class);
endTime = expressionHelper.evaluate(endTimeExp, Date.class);
SchedulerUtil.validateStartAndEndTime(startTime, endTime);
} else {
SchedulerUtil.validateStartEndForNonAbsExp(startTimeExp, endTimeExp, input.getName(),
process.getName());
}

for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
continue;
}
List<Path> paths = new ArrayList<>();
List<Location> locations = FeedHelper.getLocations(cluster, feed);
for (Location loc : locations) {
if (loc.getType() != LocationType.DATA) {

if (exptype == SchedulerUtil.EXPTYPE.ABSOLUTE) {
List<Path> paths = getPaths(cluster, feed, startTime, endTime);
Predicate predicate = Predicate.createDataPredicate(paths.size());
// To ensure we evaluate only predicates not evaluated before when an instance is resumed.
if (isResume && !awaitedPredicates.contains(predicate)) {
continue;
}
List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
startTime, endTime);
for (Date instanceTime : instanceTimes) {
String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) {
if (!path.endsWith("/")) {
path = path + "/";
}
path = path + feed.getAvailabilityFlag();
}
if (!paths.contains(new Path(path))) {
paths.add(new Path(path));
}
addDataPredicate(predicate);
DataAvailabilityService.DataRequestBuilder requestBuilder =
(DataAvailabilityService.DataRequestBuilder)
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
.createRequestBuilder(executionService, getId());
requestBuilder.setLocations(paths)
.setCluster(cluster.getName())
.setPollingFrequencyInMillis(SchedulerUtil
.getPollingFrequencyinMillis(process.getFrequency()))
.setTimeoutInMillis(getTimeOutInMillis())
.setLocations(paths)
.setInputName(input.getName())
.setExpType(exptype)
.setIsOptional(input.isOptional());
NotificationServicesRegistry.register(requestBuilder.build());
LOG.info("Registered for a data notification for process {} of instance time {} "
+ "for data location {}", process.getName(), getInstanceTime(),
StringUtils.join(paths, ","));
} else {
int startInstance = Math.abs(SchedulerUtil.getExpInstance(startTimeExp, exptype));
int endInstance = Math.abs(SchedulerUtil.getExpInstance(endTimeExp, exptype));
int noOfPaths = Math.abs(endInstance - startInstance) + 1;
Predicate predicate = Predicate.createDataPredicate(noOfPaths);

// check may be awaiting predicates already contains this
if (isResume && !awaitedPredicates.contains(predicate)) {
continue;
}
addDataPredicate(predicate);
DataAvailabilityService.DataRequestBuilder regexDataRequestBuilder =
(DataAvailabilityService.DataRequestBuilder)
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
.createRequestBuilder(executionService, getId());
Date referenceTime = new Date(System.currentTimeMillis()); // need to configure this.
regexDataRequestBuilder.setExpType(exptype)
.setCluster(cluster.getName())
.setPollingFrequencyInMillis(SchedulerUtil.
getPollingFrequencyinMillis(process.getFrequency()))
.setTimeoutInMillis(getTimeOutInMillis())
.setStartInstance(startInstance)
.setEndInstance(endInstance)
.setStartTimeInMillis(SchedulerUtil.getStartTimeInMillis(cluster.getValidity().getStart(),
feed.getFrequency(), process.getTimezone(), referenceTime, exptype))
.setEndTimeInMillis(SchedulerUtil.getEndTimeInMillis(cluster.getValidity().getStart(),
feed.getFrequency(), process.getTimezone(), referenceTime, exptype,
SchedulerUtil.getExpLimit(startTimeExp, exptype)))
.setBasePath(FeedHelper.getLocation(feed, clusterEntity, LocationType.DATA).getPath())
.setInputName(input.getName())
.setFrequencyInMillis(SchedulerUtil.getFrequencyInMillis(feed.getFrequency()))
.setIsOptional(input.isOptional());
NotificationServicesRegistry.register(regexDataRequestBuilder.build());
LOG.info("Registered for a data notification for process {} of instance time {} "
+ "for expression type {}", process.getName(), getInstanceTime(), exptype);
}
}
}
}

Predicate predicate = Predicate.createDataPredicate(paths);
// To ensure we evaluate only predicates not evaluated before when an instance is resumed.
if (isResume && !awaitedPredicates.contains(predicate)) {
continue;
private List<Path> getPaths(org.apache.falcon.entity.v0.feed.Cluster cluster,
Feed feed, Date startTime, Date endTime) {
List<Path> paths = new ArrayList<>();
List<Location> locations = FeedHelper.getLocations(cluster, feed);
for (Location loc : locations) {
if (loc.getType() != LocationType.DATA) {
continue;
}
List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
startTime, endTime);
for (Date instanceTime : instanceTimes) {
String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) {
if (!path.endsWith("/")) {
path = path + "/";
}
path = path + feed.getAvailabilityFlag();
}
if (!paths.contains(new Path(path))) {
paths.add(new Path(path));
}
addDataPredicate(predicate);
DataAvailabilityService.DataRequestBuilder requestBuilder =
(DataAvailabilityService.DataRequestBuilder)
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
.createRequestBuilder(executionService, getId());
requestBuilder.setLocations(paths)
.setCluster(cluster.getName())
.setPollingFrequencyInMillis(SchedulerUtil.getPollingFrequencyinMillis(process.getFrequency()))
.setTimeoutInMillis(getTimeOutInMillis())
.setLocations(paths);
NotificationServicesRegistry.register(requestBuilder.build());
LOG.info("Registered for a data notification for process {} of instance time {} for data location {}",
process.getName(), getInstanceTime(), StringUtils.join(paths, ","));
}
}
return paths;
}

@Override
Expand All @@ -197,9 +250,17 @@ public void onEvent(Event event) throws FalconException {
setActualEnd(((JobCompletedEvent)event).getEndTime());
break;
case DATA_AVAILABLE:
DataEvent dataEvent = (DataEvent) event;
// Data has not become available and the wait time has passed
if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
if (dataEvent.getStatus() == DataEvent.STATUS.UNAVAILABLE) {
hasTimedOut = true;
} else {
String feedName = dataEvent.getInputName();
if (this.getProperties() == null) {
this.setProperties(new Properties());
}
this.getProperties().setProperty(OozieUtils.FALCON_PROCESS_INPUT_PATHS
+ "." + feedName, StringUtils.join(dataEvent.getDataLocations(), ","));
}
// If the event matches any of the awaited predicates, remove the predicate of the awaited list
Predicate toRemove = null;
Expand Down
Loading