Apache NiFi: Missing Complete Authorization for Parameter and Service References
Description
Apache NiFi 1.10.0 through 2.0.0 are missing fine-grained authorization checking for Parameter Contexts, referenced Controller Services, and referenced Parameter Providers, when creating new Process Groups.
Creating a new Process Group can include binding to a Parameter Context, but in cases where the Process Group did not reference any Parameter values, the framework did not check user authorization for the bound Parameter Context. Missing authorization for a bound Parameter Context enabled clients to download non-sensitive Parameter values after creating the Process Group.
Creating a new Process Group can also include referencing existing Controller Services or Parameter Providers. The framework did not check user authorization for referenced Controller Services or Parameter Providers, enabling clients to create Process Groups and use these components that were otherwise unauthorized.
This vulnerability is limited in scope to authenticated users authorized to create Process Groups. The scope is further limited to deployments with component-based authorization policies. Upgrading to Apache NiFi 2.1.0 is the recommended mitigation, which includes authorization checking for Parameter and Controller Service references on Process Group creation.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Apache NiFi 1.10.0 through 2.0.0 fails to enforce authorization for Parameter Contexts and Controller Services when creating Process Groups, enabling unauthorized access.
Vulnerability
Description
Apache NiFi versions 1.10.0 through 2.0.0 contain a missing authorization check when creating new Process Groups. The framework does not verify user permissions for bound Parameter Contexts, referenced Controller Services, or referenced Parameter Providers during Process Group creation [1][2]. This oversight occurs even when the Process Group does not reference any Parameter values, allowing the binding to a Parameter Context without proper authorization.
Exploitation
Conditions
An attacker must be an authenticated user authorized to create Process Groups. The vulnerability is further limited to deployments that use component-based authorization policies. When creating a Process Group, the user can bind to a Parameter Context or reference existing Controller Services and Parameter Providers without the framework checking whether the user has the required permissions for those components [1][2].
Impact
Successful exploitation enables an attacker to download non-sensitive Parameter values from a bound Parameter Context after creating the Process Group. Additionally, the attacker can create Process Groups that reference and use Controller Services or Parameter Providers for which they lack authorization, effectively bypassing access controls [1][2].
Mitigation
Apache NiFi 2.1.0 includes the necessary authorization checks for Parameter and Controller Service references on Process Group creation. Upgrading to this version is the recommended mitigation [1][2]. No workarounds are mentioned in the advisories.
AI Insight generated on May 20, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
org.apache.nifi:nifi-web-apiMaven | >= 1.10.0, < 2.1.0 | 2.1.0 |
Affected products
4- osv-coords2 versions
>= 1.10.0, < 2.1.0+ 1 more
- (no CPE)range: >= 1.10.0, < 2.1.0
- (no CPE)range: >= 1.10.0, < 2.1.0
- Apache Software Foundation/Apache NiFiv5Range: 1.10.0
Patches
1f744deebf9a9NIFI-13976 Added REST Endpoints for Copying and Pasting Flows (#9535)
42 files changed · +3043 −246
nifi-framework-api/src/main/java/org/apache/nifi/authorization/resource/VersionedComponentAuthorizable.java+32 −0 added@@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization.resource; + +import java.util.Optional; + +/** + * ComponentAuthorizable that also exposes the versioned identifier of the underlying component. + */ +public interface VersionedComponentAuthorizable extends ComponentAuthorizable { + + /** + * The versioned component identifier of the underlying component. + * + * @return the versioned component identifier + */ + Optional<String> getVersionedComponentId(); +}
nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CopyRequestEntity.java+144 −0 added@@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlType; + +import java.util.HashSet; +import java.util.Set; + +/** + * A request to copy a portion of the flow. + */ +@XmlType(name = "copyRequestEntity") +public class CopyRequestEntity extends Entity { + + private Set<String> processGroups = new HashSet<>(); + private Set<String> remoteProcessGroups = new HashSet<>(); + private Set<String> processors = new HashSet<>(); + private Set<String> inputPorts = new HashSet<>(); + private Set<String> outputPorts = new HashSet<>(); + private Set<String> connections = new HashSet<>(); + private Set<String> labels = new HashSet<>(); + private Set<String> funnels = new HashSet<>(); + + /** + * @return the ids of the connections to be copied. + */ + @Schema(description = "The ids of the connections to be copied." + ) + public Set<String> getConnections() { + return connections; + } + + public void setConnections(Set<String> connections) { + this.connections = connections; + } + + /** + * @return the ids of the funnels to be copied. + */ + @Schema(description = "The ids of the funnels to be copied." + ) + public Set<String> getFunnels() { + return funnels; + } + + public void setFunnels(Set<String> funnels) { + this.funnels = funnels; + } + + /** + * @return the ids of the input port to be copied. + */ + @Schema(description = "The ids of the input ports to be copied." + ) + public Set<String> getInputPorts() { + return inputPorts; + } + + public void setInputPorts(Set<String> inputPorts) { + this.inputPorts = inputPorts; + } + + /** + * @return the ids of the labels to be copied. + */ + @Schema(description = "The ids of the labels to be copied." + ) + public Set<String> getLabels() { + return labels; + } + + public void setLabels(Set<String> labels) { + this.labels = labels; + } + + /** + * @return the ids of the output ports to be copied. + */ + @Schema(description = "The ids of the output ports to be copied." + ) + public Set<String> getOutputPorts() { + return outputPorts; + } + + public void setOutputPorts(Set<String> outputPorts) { + this.outputPorts = outputPorts; + } + + /** + * @return The ids of the process groups to be copied. + */ + @Schema(description = "The ids of the process groups to be copied." + ) + public Set<String> getProcessGroups() { + return processGroups; + } + + public void setProcessGroups(Set<String> processGroups) { + this.processGroups = processGroups; + } + + /** + * @return The ids of the processors to be copied. + */ + @Schema(description = "The ids of the processors to be copied." + ) + public Set<String> getProcessors() { + return processors; + } + + public void setProcessors(Set<String> processors) { + this.processors = processors; + } + + /** + * @return the ids of the remote process groups to be copied. + */ + @Schema(description = "The ids of the remote process groups to be copied." + ) + public Set<String> getRemoteProcessGroups() { + return remoteProcessGroups; + } + + public void setRemoteProcessGroups(Set<String> remoteProcessGroups) { + this.remoteProcessGroups = remoteProcessGroups; + } + +}
nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CopyResponseEntity.java+221 −0 added@@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlType; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ParameterProviderReference; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedFunnel; +import org.apache.nifi.flow.VersionedLabel; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedRemoteProcessGroup; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A response to copy a portion of the flow. + */ +@XmlType(name = "copyResponseEntity") +public class CopyResponseEntity extends Entity { + + private String id; + + private Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = new HashMap<>(); + private Map<String, VersionedParameterContext> parameterContexts = new HashMap<>(); + private Map<String, ParameterProviderReference> parameterProviders = new HashMap<>(); + + private Set<VersionedProcessGroup> processGroups = new HashSet<>(); + private Set<VersionedRemoteProcessGroup> remoteProcessGroups = new HashSet<>(); + private Set<VersionedProcessor> processors = new HashSet<>(); + private Set<VersionedPort> inputPorts = new HashSet<>(); + private Set<VersionedPort> outputPorts = new HashSet<>(); + private Set<VersionedConnection> connections = new HashSet<>(); + private Set<VersionedLabel> labels = new HashSet<>(); + private Set<VersionedFunnel> funnels = new HashSet<>(); + + /** + * The id for this copy action. + * + * @return The id + */ + @Schema(description = "The id for this copy action." + ) + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * The external controller service references. + * + * @return The external controller service reference + */ + @Schema(description = "The external controller service references." + ) + public Map<String, ExternalControllerServiceReference> getExternalControllerServiceReferences() { + return externalControllerServiceReferences; + } + + public void setExternalControllerServiceReferences(Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) { + this.externalControllerServiceReferences = externalControllerServiceReferences; + } + + /** + * The referenced parameter contexts. + * + * @return The referenced parameter contexts + */ + @Schema(description = "The referenced parameter contexts." + ) + public Map<String, VersionedParameterContext> getParameterContexts() { + return parameterContexts; + } + + public void setParameterContexts(Map<String, VersionedParameterContext> parameterContexts) { + this.parameterContexts = parameterContexts; + } + + /** + * The referenced parameter providers. + * + * @return The referenced parameter providers + */ + @Schema(description = "The referenced parameter providers." + ) + public Map<String, ParameterProviderReference> getParameterProviders() { + return parameterProviders; + } + + public void setParameterProviders(Map<String, ParameterProviderReference> parameterProviders) { + this.parameterProviders = parameterProviders; + } + + /** + * @return the connections being copied. + */ + @Schema(description = "The connections being copied." + ) + public Set<VersionedConnection> getConnections() { + return connections; + } + + public void setConnections(Set<VersionedConnection> connections) { + this.connections = connections; + } + + /** + * @return the funnels being copied. + */ + @Schema(description = "The funnels being copied." + ) + public Set<VersionedFunnel> getFunnels() { + return funnels; + } + + public void setFunnels(Set<VersionedFunnel> funnels) { + this.funnels = funnels; + } + + /** + * @return the input port being copied. + */ + @Schema(description = "The input ports being copied." + ) + public Set<VersionedPort> getInputPorts() { + return inputPorts; + } + + public void setInputPorts(Set<VersionedPort> inputPorts) { + this.inputPorts = inputPorts; + } + + /** + * @return the labels being copied. + */ + @Schema(description = "The labels being copied." + ) + public Set<VersionedLabel> getLabels() { + return labels; + } + + public void setLabels(Set<VersionedLabel> labels) { + this.labels = labels; + } + + /** + * @return the output ports being copied. + */ + @Schema(description = "The output ports being copied." + ) + public Set<VersionedPort> getOutputPorts() { + return outputPorts; + } + + public void setOutputPorts(Set<VersionedPort> outputPorts) { + this.outputPorts = outputPorts; + } + + /** + * @return The process groups being copied. + */ + @Schema(description = "The process groups being copied." + ) + public Set<VersionedProcessGroup> getProcessGroups() { + return processGroups; + } + + public void setProcessGroups(Set<VersionedProcessGroup> processGroups) { + this.processGroups = processGroups; + } + + /** + * @return The processors being copied. + */ + @Schema(description = "The processors being copied." + ) + public Set<VersionedProcessor> getProcessors() { + return processors; + } + + public void setProcessors(Set<VersionedProcessor> processors) { + this.processors = processors; + } + + /** + * @return the remote process groups being copied. + */ + @Schema(description = "The remote process groups being copied." + ) + public Set<VersionedRemoteProcessGroup> getRemoteProcessGroups() { + return remoteProcessGroups; + } + + public void setRemoteProcessGroups(Set<VersionedRemoteProcessGroup> remoteProcessGroups) { + this.remoteProcessGroups = remoteProcessGroups; + } +}
nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PasteRequestEntity.java+70 −0 added@@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlType; +import org.apache.nifi.web.api.dto.RevisionDTO; + +/** + * A request to paste a portion of the flow. + */ +@XmlType(name = "pasteRequestEntity") +public class PasteRequestEntity extends Entity { + + private CopyResponseEntity copyResponse; + + private RevisionDTO revision; + private Boolean disconnectedNodeAcknowledged; + + /** + * @return the response from copying + */ + @Schema(description = "The response from copying." + ) + public CopyResponseEntity getCopyResponse() { + return copyResponse; + } + + public void setCopyResponse(CopyResponseEntity copyResponse) { + this.copyResponse = copyResponse; + } + + /** + * @return revision for this request/response + */ + @Schema(description = "The revision for this request/response. The revision is required for any mutable flow requests and is included in all responses." + ) + public RevisionDTO getRevision() { + return revision; + } + + public void setRevision(RevisionDTO revision) { + this.revision = revision; + } + + /** + * @return Acknowledges if a node is disconnected from a cluster + */ + public Boolean getDisconnectedNodeAcknowledged() { + return disconnectedNodeAcknowledged; + } + + public void setDisconnectedNodeAcknowledged(Boolean disconnectedNodeAcknowledged) { + this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged; + } +}
nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/PasteResponseEntity.java+59 −0 added@@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlType; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.flow.FlowDTO; + +/** + * A response to copy a portion of the flow. + */ +@XmlType(name = "pasteResponseEntity") +public class PasteResponseEntity extends Entity { + + private FlowDTO flow; + + private RevisionDTO revision; + + /** + * @return flow containing the components that were created as part of this paste action + */ + @Schema(description = "Flow containing the components that were created as part of this paste action." + ) + public FlowDTO getFlow() { + return flow; + } + + public void setFlow(FlowDTO flow) { + this.flow = flow; + } + + /** + * @return revision for this request/response + */ + @Schema(description = "The revision for this request/response. The revision is required for any mutable flow requests and is included in all responses." + ) + public RevisionDTO getRevision() { + return revision; + } + + public void setRevision(RevisionDTO revision) { + this.revision = revision; + } +}
nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupFlowEntity.java+15 −0 modified@@ -18,6 +18,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import org.apache.nifi.web.api.dto.PermissionsDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; import jakarta.xml.bind.annotation.XmlRootElement; @@ -28,9 +29,23 @@ @XmlRootElement(name = "processGroupFlowEntity") public class ProcessGroupFlowEntity extends Entity { + private RevisionDTO revision; private PermissionsDTO permissions; private ProcessGroupFlowDTO processGroupFlow; + /** + * @return revision for this component + */ + @Schema(description = "The revision for this process group." + ) + public RevisionDTO getRevision() { + return revision; + } + + public void setRevision(RevisionDTO revision) { + this.revision = revision; + } + /** * The permissions for this component. *
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/PasteEndpointMerger.java+145 −0 added@@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.manager.ConnectionsEntityMerger; +import org.apache.nifi.cluster.manager.FunnelsEntityMerger; +import org.apache.nifi.cluster.manager.LabelsEntityMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.PortsEntityMerger; +import org.apache.nifi.cluster.manager.ProcessGroupsEntityMerger; +import org.apache.nifi.cluster.manager.ProcessorsEntityMerger; +import org.apache.nifi.cluster.manager.RemoteProcessGroupsEntityMerger; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.FunnelEntity; +import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class PasteEndpointMerger extends AbstractSingleDTOEndpoint<PasteResponseEntity, FlowDTO> { + public static final Pattern PASTE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/paste"); + + @Override + public boolean canHandle(final URI uri, final String method) { + return "PUT".equalsIgnoreCase(method) && PASTE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches(); + } + + @Override + protected Class<PasteResponseEntity> getEntityClass() { + return PasteResponseEntity.class; + } + + @Override + protected FlowDTO getDto(PasteResponseEntity entity) { + return entity.getFlow(); + } + + @Override + protected void mergeResponses(final FlowDTO clientDto, final Map<NodeIdentifier, FlowDTO> dtoMap, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses) { + final Set<ConnectionEntity> clientConnections = clientDto.getConnections(); + final Set<ProcessorEntity> clientProcessors = clientDto.getProcessors(); + final Set<PortEntity> clientInputPorts = clientDto.getInputPorts(); + final Set<PortEntity> clientOutputPorts = clientDto.getOutputPorts(); + final Set<RemoteProcessGroupEntity> clientRemoteProcessGroups = clientDto.getRemoteProcessGroups(); + final Set<ProcessGroupEntity> clientProcessGroups = clientDto.getProcessGroups(); + final Set<LabelEntity> clientLabels = clientDto.getLabels(); + final Set<FunnelEntity> clientFunnels = clientDto.getFunnels(); + + final Map<String, Map<NodeIdentifier, ConnectionEntity>> connections = new HashMap<>(); + final Map<String, Map<NodeIdentifier, FunnelEntity>> funnels = new HashMap<>(); + final Map<String, Map<NodeIdentifier, PortEntity>> inputPorts = new HashMap<>(); + final Map<String, Map<NodeIdentifier, LabelEntity>> labels = new HashMap<>(); + final Map<String, Map<NodeIdentifier, PortEntity>> outputPorts = new HashMap<>(); + final Map<String, Map<NodeIdentifier, ProcessorEntity>> processors = new HashMap<>(); + final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> rpgs = new HashMap<>(); + final Map<String, Map<NodeIdentifier, ProcessGroupEntity>> processGroups = new HashMap<>(); + + // Create mapping of ComponentID -> [nodeId, entity on that node] + for (final Map.Entry<NodeIdentifier, FlowDTO> nodeFlowEntry : dtoMap.entrySet()) { + final NodeIdentifier nodeIdentifier = nodeFlowEntry.getKey(); + final FlowDTO nodeFlowDto = nodeFlowEntry.getValue(); + + for (final ConnectionEntity entity : nodeFlowDto.getConnections()) { + connections.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final FunnelEntity entity : nodeFlowDto.getFunnels()) { + funnels.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final PortEntity entity : nodeFlowDto.getInputPorts()) { + inputPorts.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final PortEntity entity : nodeFlowDto.getOutputPorts()) { + outputPorts.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final LabelEntity entity : nodeFlowDto.getLabels()) { + labels.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final ProcessorEntity entity : nodeFlowDto.getProcessors()) { + processors.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final RemoteProcessGroupEntity entity : nodeFlowDto.getRemoteProcessGroups()) { + rpgs.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + + for (final ProcessGroupEntity entity : nodeFlowDto.getProcessGroups()) { + processGroups.computeIfAbsent(entity.getId(), id -> new HashMap<>()).computeIfAbsent(nodeIdentifier, nodeId -> entity); + } + } + + // Merge connections + ConnectionsEntityMerger.mergeConnections(clientConnections, connections); + + // Merge funnel statuses + FunnelsEntityMerger.mergeFunnels(clientFunnels, funnels); + + // Merge input ports + PortsEntityMerger.mergePorts(clientInputPorts, inputPorts); + + // Merge output ports + PortsEntityMerger.mergePorts(clientOutputPorts, outputPorts); + + // Merge labels + LabelsEntityMerger.mergeLabels(clientLabels, labels); + + // Merge processors + ProcessorsEntityMerger.mergeProcessors(clientProcessors, processors); + + // Merge Remote Process Groups + RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(clientRemoteProcessGroups, rpgs); + + // Merge Process Groups + ProcessGroupsEntityMerger.mergeProcessGroups(clientProcessGroups, processGroups); + } +}
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java+2 −0 modified@@ -65,6 +65,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ParameterProviderEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ParameterProviderFetchRequestsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ParameterProvidersEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.PasteEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.PrioritizerTypesEndpointMerger; @@ -145,6 +146,7 @@ public StandardHttpResponseMapper(final NiFiProperties nifiProperties) { endpointMergers.add(new ProcessGroupEndpointMerger()); endpointMergers.add(new ProcessGroupsEndpointMerger()); endpointMergers.add(new FlowSnippetEndpointMerger()); + endpointMergers.add(new PasteEndpointMerger()); endpointMergers.add(new ProvenanceQueryEndpointMerger()); endpointMergers.add(new ProvenanceEventEndpointMerger()); endpointMergers.add(new LatestProvenanceEventsMerger());
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceResolver.java+20 −7 modified@@ -62,7 +62,7 @@ public StandardControllerServiceResolver(final Authorizer authorizer, } @Override - public void resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final String parentGroupId, final NiFiUser user) { + public Set<String> resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final String parentGroupId, final NiFiUser user) { final RegisteredFlowSnapshot topLevelSnapshot = flowSnapshotContainer.getFlowSnapshot(); final VersionedProcessGroup versionedGroup = topLevelSnapshot.getFlowContents(); final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = topLevelSnapshot.getExternalControllerServices(); @@ -77,12 +77,15 @@ public void resolveInheritedControllerServices(final FlowSnapshotContainer flowS final Stack<Set<VersionedControllerService>> serviceHierarchyStack = new Stack<>(); serviceHierarchyStack.push(ancestorServices); - resolveInheritedControllerServices(flowSnapshotContainer, versionedGroup, externalControllerServiceReferences, serviceHierarchyStack); + final Set<String> unresolvedServices = new HashSet<>(); + resolveInheritedControllerServices(flowSnapshotContainer, versionedGroup, externalControllerServiceReferences, serviceHierarchyStack, unresolvedServices); + return unresolvedServices; } private void resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final VersionedProcessGroup versionedGroup, final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences, - final Stack<Set<VersionedControllerService>> serviceHierarchyStack) { + final Stack<Set<VersionedControllerService>> serviceHierarchyStack, + final Set<String> unresolvedServices) { final Set<VersionedControllerService> currentGroupServices = versionedGroup.getControllerServices() == null ? Collections.emptySet() : versionedGroup.getControllerServices(); serviceHierarchyStack.push(currentGroupServices); @@ -92,11 +95,11 @@ private void resolveInheritedControllerServices(final FlowSnapshotContainer flow .collect(Collectors.toSet()); for (final VersionedProcessor processor : versionedGroup.getProcessors()) { - resolveInheritedControllerServices(processor, availableControllerServices, externalControllerServiceReferences); + resolveInheritedControllerServices(processor, availableControllerServices, externalControllerServiceReferences, unresolvedServices); } for (final VersionedControllerService service : versionedGroup.getControllerServices()) { - resolveInheritedControllerServices(service, availableControllerServices, externalControllerServiceReferences); + resolveInheritedControllerServices(service, availableControllerServices, externalControllerServiceReferences, unresolvedServices); } // If the child group is under version, the external service references need to come from the snapshot of the @@ -113,14 +116,15 @@ private void resolveInheritedControllerServices(final FlowSnapshotContainer flow childExternalServices = childSnapshot.getExternalControllerServices(); } } - resolveInheritedControllerServices(flowSnapshotContainer, child, childExternalServices, serviceHierarchyStack); + resolveInheritedControllerServices(flowSnapshotContainer, child, childExternalServices, serviceHierarchyStack, unresolvedServices); } serviceHierarchyStack.pop(); } private void resolveInheritedControllerServices(final VersionedConfigurableExtension component, final Set<VersionedControllerService> availableControllerServices, - final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) { + final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences, + final Set<String> unresolvedServices) { final Map<String, ControllerServiceAPI> componentRequiredApis = controllerServiceApiLookup.getRequiredServiceApis(component.getType(), component.getBundle()); if (componentRequiredApis.isEmpty()) { @@ -134,6 +138,11 @@ private void resolveInheritedControllerServices(final VersionedConfigurableExten final String propertyName = entry.getKey(); final String propertyValue = entry.getValue(); + // if the property isn't set there is nothing to resolve + if (propertyValue == null) { + continue; + } + final VersionedPropertyDescriptor propertyDescriptor = propertyDescriptors.get(propertyName); if (propertyDescriptor == null) { continue; @@ -149,16 +158,19 @@ private void resolveInheritedControllerServices(final VersionedConfigurableExten // If the referenced Controller Service is available, there is nothing to resolve. if (availableControllerServiceIds.contains(propertyValue)) { + unresolvedServices.add(propertyValue); continue; } final ExternalControllerServiceReference externalServiceReference = externalControllerServiceReferences == null ? null : externalControllerServiceReferences.get(propertyValue); if (externalServiceReference == null) { + unresolvedServices.add(propertyValue); continue; } final ControllerServiceAPI descriptorRequiredApi = componentRequiredApis.get(propertyName); if (descriptorRequiredApi == null) { + unresolvedServices.add(propertyValue); continue; } @@ -169,6 +181,7 @@ private void resolveInheritedControllerServices(final VersionedConfigurableExten .collect(Collectors.toList()); if (matchingControllerServices.size() != 1) { + unresolvedServices.add(propertyValue); continue; }
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java+200 −38 modified@@ -74,6 +74,7 @@ import org.apache.nifi.flow.VersionedRemoteProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.ComponentIdGenerator; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileOutboundPolicy; @@ -84,6 +85,7 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.groups.StandardVersionedFlowStatus; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.migration.ControllerServiceFactory; import org.apache.nifi.migration.StandardControllerServiceFactory; @@ -167,6 +169,146 @@ public void setSynchronizationOptions(final FlowSynchronizationOptions syncOptio this.syncOptions = syncOptions; } + @Override + public ComponentAdditions addVersionedComponentsToProcessGroup(final ProcessGroup group, final VersionedComponentAdditions additions, final FlowSynchronizationOptions options) { + updatedVersionedComponentIds.clear(); + createdAndModifiedExtensions.clear(); + setSynchronizationOptions(options); + + final ComponentAdditions.Builder additionsBuilder = new ComponentAdditions.Builder(); + + // add any controller services first since they may be referenced by components to follow + final Map<VersionedControllerService, ControllerServiceNode> instanceMapping = new HashMap<>(); + additions.getControllerServices().forEach(controllerService -> { + final ControllerServiceNode newService = addControllerService(group, controllerService, options.getComponentIdGenerator(), group); + instanceMapping.put(controllerService, newService); + additionsBuilder.addControllerService(newService); + }); + + // go through the controller services again and update each to update any service references + // to their new identifiers + additions.getControllerServices().forEach(controllerService -> { + final ControllerServiceNode newService = instanceMapping.get(controllerService); + if (newService != null) { + updateControllerService(newService, controllerService, group); + } + }); + + // add any processors + additions.getProcessors().forEach(processor -> { + try { + final ProcessorNode newProcessor = addProcessor(group, processor, options.getComponentIdGenerator(), group); + additionsBuilder.addProcessor(newProcessor); + } catch (final ProcessorInstantiationException pie) { + throw new RuntimeException(pie); + } + }); + + // track the proposed port names so they can be updated after adding with guaranteed unique names + final Map<Port, String> proposedPortFinalNames = new HashMap<>(); + final Set<String> existingInputPorts = group.getInputPorts().stream().map(Port::getName).collect(Collectors.toSet()); + final Set<String> existingOutputPorts = group.getOutputPorts().stream().map(Port::getName).collect(Collectors.toSet()); + + // add any input ports + additions.getInputPorts().forEach(inputPort -> { + // if we're adding to the root group than ports must allow remote access + if (group.isRootGroup()) { + inputPort.setAllowRemoteAccess(true); + } + + final String temporaryName = generateTemporaryPortName(inputPort); + final Port newInputPort = addInputPort(group, inputPort, options.getComponentIdGenerator(), temporaryName); + + // if the proposed port name does not conflict with any existing ports include the proposed name for updating later + if (!existingInputPorts.contains(inputPort.getName())) { + proposedPortFinalNames.put(newInputPort, inputPort.getName()); + } + + additionsBuilder.addInputPort(newInputPort); + }); + + // add any output ports + additions.getOutputPorts().forEach(outputPort -> { + // if we're adding to the root group than ports must allow remote access + if (group.isRootGroup()) { + outputPort.setAllowRemoteAccess(true); + } + + final String temporaryName = generateTemporaryPortName(outputPort); + final Port newOutputPort = addOutputPort(group, outputPort, options.getComponentIdGenerator(), temporaryName); + + // if the proposed port name does not conflict with any existing ports include the proposed name for updating later + if (!existingOutputPorts.contains(outputPort.getName())) { + proposedPortFinalNames.put(newOutputPort, outputPort.getName()); + } + + additionsBuilder.addOutputPort(newOutputPort); + }); + + // add any labels + additions.getLabels().forEach(label -> { + final Label newLabel = addLabel(group, label, options.getComponentIdGenerator()); + additionsBuilder.addLabel(newLabel); + }); + + // add any funnels + additions.getFunnels().forEach(funnel -> { + final Funnel newFunnel = addFunnel(group, funnel, options.getComponentIdGenerator()); + additionsBuilder.addFunnel(newFunnel); + }); + + // add any remote process groups + additions.getRemoteProcessGroups().forEach(remoteProcessGroup -> { + final RemoteProcessGroup newRemoteProcessGroup = addRemoteProcessGroup(group, remoteProcessGroup, options.getComponentIdGenerator()); + additionsBuilder.addRemoteProcessGroup(newRemoteProcessGroup); + }); + + // add any process groups + additions.getProcessGroups().forEach(processGroup -> { + try { + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); + } catch (final ProcessorInstantiationException pie) { + throw new RuntimeException(pie); + } + }); + + // lastly add any connections with all source/destinations already added + additions.getConnections().forEach(connection -> { + // null out any instance id's in the connections source/destination since that would be favored + // when attaching the connection to the appropriate components + if (connection.getSource() != null) { + connection.getSource().setInstanceIdentifier(null); + } + if (connection.getDestination() != null) { + connection.getDestination().setInstanceIdentifier(null); + } + + final Connection newConnection = addConnection(group, connection, options.getComponentIdGenerator()); + additionsBuilder.addConnection(newConnection); + }); + + // update ports to final names + updatePortsToFinalNames(proposedPortFinalNames); + + for (final CreatedOrModifiedExtension createdOrModifiedExtension : createdAndModifiedExtensions) { + final ComponentNode extension = createdOrModifiedExtension.extension(); + final Map<String, String> originalPropertyValues = createdOrModifiedExtension.propertyValues(); + + final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(context.getExtensionManager(), context.getFlowManager(), + context.getControllerServiceProvider(), extension); + + if (extension instanceof final ProcessorNode processor) { + processor.migrateConfiguration(originalPropertyValues, serviceFactory); + } else if (extension instanceof final ControllerServiceNode service) { + service.migrateConfiguration(originalPropertyValues, serviceFactory); + } + } + + return additionsBuilder.build(); + } + @Override public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final FlowSynchronizationOptions options) { final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions()); @@ -363,21 +505,24 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE; } - final VersionControlInformation vci = new StandardVersionControlInformation.Builder() - .registryId(registryId) - .registryName(registryName) - .branch(branch) - .bucketId(bucketId) - .bucketName(bucketId) - .flowId(flowId) - .storageLocation(storageLocation) - .flowName(flowId) - .version(version) - .flowSnapshot(syncOptions.isUpdateGroupVersionControlSnapshot() ? proposed : null) - .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription())) - .build(); + // only attempt to set the version control information when an applicable registry client could be discovered + if (registryId != null) { + final VersionControlInformation vci = new StandardVersionControlInformation.Builder() + .registryId(registryId) + .registryName(registryName) + .branch(branch) + .bucketId(bucketId) + .bucketName(bucketId) + .flowId(flowId) + .storageLocation(storageLocation) + .flowName(flowId) + .version(version) + .flowSnapshot(syncOptions.isUpdateGroupVersionControlSnapshot() ? proposed : null) + .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription())) + .build(); - group.setVersionControlInformation(vci, Collections.emptyMap()); + group.setVersionControlInformation(vci, Collections.emptyMap()); + } } // In order to properly update all of the components, we have to follow a specific order of operations, in order to ensure that @@ -1078,6 +1223,13 @@ private void synchronizeRemoteGroups(final ProcessGroup group, final VersionedPr } } + @Override + public void verifyCanAddVersionedComponents(final ProcessGroup group, final VersionedComponentAdditions additions) { + verifyCanInstantiateProcessors(group, additions.getProcessors(), additions.getProcessGroups()); + verifyCanInstantiateControllerServices(group, additions.getControllerServices(), additions.getProcessGroups()); + verifyCanInstantiateConnections(group, additions.getConnections(), additions.getProcessGroups()); + } + @Override public void verifyCanSynchronize(final ProcessGroup group, final VersionedProcessGroup flowContents, final boolean verifyConnectionRemoval) { // Optionally check that no deleted connections contain data in their queue. @@ -1124,13 +1276,19 @@ public void verifyCanSynchronize(final ProcessGroup group, final VersionedProces } } + verifyCanInstantiateProcessors(group, flowContents.getProcessors(), flowContents.getProcessGroups()); + verifyCanInstantiateControllerServices(group, flowContents.getControllerServices(), flowContents.getProcessGroups()); + verifyCanInstantiateConnections(group, flowContents.getConnections(), flowContents.getProcessGroups()); + } + + private void verifyCanInstantiateProcessors(final ProcessGroup group, final Set<VersionedProcessor> processors, final Set<VersionedProcessGroup> childGroups) { // Ensure that all Processors are instantiable final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>(); - findAllProcessors(flowContents, proposedProcessors); + findAllProcessors(processors, childGroups, proposedProcessors); group.findAllProcessors() - .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().orElse( - NiFiRegistryFlowMapper.generateVersionedComponentId(proc.getIdentifier())))); + .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().orElse( + NiFiRegistryFlowMapper.generateVersionedComponentId(proc.getIdentifier())))); for (final VersionedProcessor processorToAdd : proposedProcessors.values()) { final String processorToAddClass = processorToAdd.getType(); @@ -1145,21 +1303,23 @@ public void verifyCanSynchronize(final ProcessGroup group, final VersionedProces // Could not resolve the bundle explicitly. Check for possible bundles. final List<org.apache.nifi.bundle.Bundle> possibleBundles = context.getExtensionManager().getBundles(processorToAddClass); final boolean bundleExists = possibleBundles.stream() - .anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); + .anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); if (!bundleExists && possibleBundles.size() != 1) { LOG.warn("Unknown bundle {} for processor type {} - will use Ghosted component instead", processorToAddCoordinate, processorToAddClass); } } } + } + private void verifyCanInstantiateControllerServices(final ProcessGroup group, final Set<VersionedControllerService> controllerServices, final Set<VersionedProcessGroup> childGroups) { // Ensure that all Controller Services are instantiable final Map<String, VersionedControllerService> proposedServices = new HashMap<>(); - findAllControllerServices(flowContents, proposedServices); + findAllControllerServices(controllerServices, childGroups, proposedServices); group.findAllControllerServices() - .forEach(service -> proposedServices.remove(service.getVersionedComponentId().orElse( - NiFiRegistryFlowMapper.generateVersionedComponentId(service.getIdentifier())))); + .forEach(service -> proposedServices.remove(service.getVersionedComponentId().orElse( + NiFiRegistryFlowMapper.generateVersionedComponentId(service.getIdentifier())))); for (final VersionedControllerService serviceToAdd : proposedServices.values()) { final String serviceToAddClass = serviceToAdd.getType(); @@ -1169,24 +1329,26 @@ public void verifyCanSynchronize(final ProcessGroup group, final VersionedProces if (resolved == null) { final List<org.apache.nifi.bundle.Bundle> possibleBundles = context.getExtensionManager().getBundles(serviceToAddClass); final boolean bundleExists = possibleBundles.stream() - .anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); + .anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); if (!bundleExists && possibleBundles.size() != 1) { LOG.warn("Unknown bundle {} for processor type {} - will use Ghosted component instead", serviceToAddCoordinate, serviceToAddClass); } } } + } + private void verifyCanInstantiateConnections(final ProcessGroup group, final Set<VersionedConnection> connections, final Set<VersionedProcessGroup> childGroups) { // Ensure that all Prioritizers are instantiable and that any load balancing configuration is correct // Enforcing ancestry on connection matching here is not important because all we're interested in is locating // new prioritizers and load balance strategy types so if a matching connection existed anywhere in the current // flow, then its prioritizer and load balance strategy are already validated final Map<String, VersionedConnection> proposedConnections = new HashMap<>(); - findAllConnections(flowContents, proposedConnections); + findAllConnections(connections, childGroups, proposedConnections); group.findAllConnections() - .forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().orElse( - NiFiRegistryFlowMapper.generateVersionedComponentId(conn.getIdentifier())))); + .forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().orElse( + NiFiRegistryFlowMapper.generateVersionedComponentId(conn.getIdentifier())))); for (final VersionedConnection connectionToAdd : proposedConnections.values()) { if (connectionToAdd.getPrioritizers() != null) { @@ -1205,7 +1367,7 @@ public void verifyCanSynchronize(final ProcessGroup group, final VersionedProces LoadBalanceStrategy.valueOf(loadBalanceStrategyName); } catch (final IllegalArgumentException iae) { throw new IllegalArgumentException("Unable to create Connection with Load Balance Strategy of '" + loadBalanceStrategyName - + "' because this is not a known Load Balance Strategy"); + + "' because this is not a known Load Balance Strategy"); } } } @@ -3725,33 +3887,33 @@ private boolean matchesGroupId(final ProcessGroup group, final String groupId) { NiFiRegistryFlowMapper.generateVersionedComponentId(group.getIdentifier())).equals(groupId); } - private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) { - for (final VersionedProcessor processor : group.getProcessors()) { + private void findAllProcessors(final Set<VersionedProcessor> processors, final Set<VersionedProcessGroup> childGroups, final Map<String, VersionedProcessor> map) { + for (final VersionedProcessor processor : processors) { map.put(processor.getIdentifier(), processor); } - for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { - findAllProcessors(childGroup, map); + for (final VersionedProcessGroup childGroup : childGroups) { + findAllProcessors(childGroup.getProcessors(), childGroup.getProcessGroups(), map); } } - private void findAllControllerServices(final VersionedProcessGroup group, final Map<String, VersionedControllerService> map) { - for (final VersionedControllerService service : group.getControllerServices()) { + private void findAllControllerServices(final Set<VersionedControllerService> controllerServices, final Set<VersionedProcessGroup> childGroups, final Map<String, VersionedControllerService> map) { + for (final VersionedControllerService service : controllerServices) { map.put(service.getIdentifier(), service); } - for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { - findAllControllerServices(childGroup, map); + for (final VersionedProcessGroup childGroup : childGroups) { + findAllControllerServices(childGroup.getControllerServices(), childGroup.getProcessGroups(), map); } } - private void findAllConnections(final VersionedProcessGroup group, final Map<String, VersionedConnection> map) { - for (final VersionedConnection connection : group.getConnections()) { + private void findAllConnections(final Set<VersionedConnection> connections, final Set<VersionedProcessGroup> childGroups, final Map<String, VersionedConnection> map) { + for (final VersionedConnection connection : connections) { map.put(connection.getIdentifier(), connection); } - for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { - findAllConnections(childGroup, map); + for (final VersionedProcessGroup childGroup : childGroups) { + findAllConnections(childGroup.getConnections(), childGroup.getProcessGroups(), map); } }
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java+20 −0 modified@@ -40,15 +40,35 @@ import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedRemoteProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.FlowSynchronizationOptions; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.parameter.ParameterContext; import java.util.concurrent.TimeoutException; public interface VersionedComponentSynchronizer { + /** + * Adds versioned components to the specified Process Group. + * + * @param group the Process Group to append to + * @param additions the component additions to add to the Process Group + * @param options sync options + * @return the component additions + */ + ComponentAdditions addVersionedComponentsToProcessGroup(ProcessGroup group, VersionedComponentAdditions additions, FlowSynchronizationOptions options); + + /** + * Verifies that the given additions can be applied to the specified process group. + * + * @param group the Process Group that will be appended to + * @param additions the component additions that will be added to the Process Group + */ + void verifyCanAddVersionedComponents(ProcessGroup group, VersionedComponentAdditions additions); + /** * Synchronize the given Process Group to match the proposed flow * @param group the Process Group to update
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java+24 −0 modified@@ -3812,6 +3812,30 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) { } } + @Override + public ComponentAdditions addVersionedComponents(final VersionedComponentAdditions additions, final String componentIdSeed) { + final ComponentIdGenerator idGenerator = (proposedId, instanceId, destinationGroupId) -> generateUuid(proposedId, destinationGroupId, componentIdSeed); + + final FlowSynchronizationOptions synchronizationOptions = new FlowSynchronizationOptions.Builder() + .componentIdGenerator(idGenerator) + .componentComparisonIdLookup(VersionedComponent::getIdentifier) + .componentScheduler(ComponentScheduler.NOP_SCHEDULER) + .propertyDecryptor(value -> null) + .build(); + + writeLock.lock(); + try { + final VersionedFlowSynchronizationContext groupSynchronizationContext = createGroupSynchronizationContext( + synchronizationOptions.getComponentIdGenerator(), synchronizationOptions.getComponentScheduler(), FlowMappingOptions.DEFAULT_OPTIONS); + final StandardVersionedComponentSynchronizer synchronizer = new StandardVersionedComponentSynchronizer(groupSynchronizationContext); + + synchronizer.verifyCanAddVersionedComponents(this, additions); + return synchronizer.addVersionedComponentsToProcessGroup(this, additions, synchronizationOptions); + } finally { + writeLock.unlock(); + } + } + @Override public void updateFlow(final VersionedExternalFlow proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java+1 −1 modified@@ -485,7 +485,7 @@ public void configureParameterProvider(final ParameterProviderConfiguration para if (hasUserEnteredParameters) { throw new IllegalArgumentException(String.format("A Parameter Provider [%s] cannot be set since there are already user-entered parameters " + - "in Context [%s]", parameterProvider.getIdentifier(), name)); + "in Context [%s]", parameterProviderNode.getIdentifier(), name)); } this.parameterProvider = parameterProviderNode.getParameterProvider();
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java+7 −3 modified@@ -311,9 +311,13 @@ private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, fin } private String getId(final Optional<String> currentVersionedId, final String componentId) { - final String versionedId = flowMappingOptions.getComponentIdLookup().getComponentId(currentVersionedId, componentId); - versionedComponentIds.put(componentId, versionedId); - return versionedId; + if (versionedComponentIds.containsKey(componentId)) { + return versionedComponentIds.get(componentId); + } else { + final String versionedId = flowMappingOptions.getComponentIdLookup().getComponentId(currentVersionedId, componentId); + versionedComponentIds.put(componentId, versionedId); + return versionedId; + } } /**
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java+34 −0 modified@@ -63,6 +63,7 @@ import org.apache.nifi.groups.FlowSynchronizationOptions; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ScheduledStateChangeListener; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.parameter.Parameter; @@ -127,6 +128,7 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -390,6 +392,38 @@ public void testSynchronizeProcessorAddedMigrated() { assertEquals(ENCODED_TEXT, propertyValue); } + @Test + public void testAddVersionedComponents() { + final VersionedControllerService versionedService = createMinimalVersionedControllerService(); + + final Map<String, String> versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE); + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setProperties(versionedProperties); + + final ProcessorNode processorNode = createMockProcessor(); + when(flowManager.createProcessor(any(), any(), any(), eq(true))).thenReturn(processorNode); + + final VersionedComponentAdditions additions = new VersionedComponentAdditions.Builder() + .setProcessors(Set.of(versionedProcessor)) + .setControllerServices(Set.of(versionedService)) + .build(); + + synchronizer.addVersionedComponentsToProcessGroup(group, additions, synchronizationOptions); + + verify(group).addProcessor(processorNode); + verify(processorNode).migrateConfiguration(propertiesCaptor.capture(), any()); + Map<String, String> migratedProperties = propertiesCaptor.getValue(); + String propertyValue = migratedProperties.get(SENSITIVE_PROPERTY_NAME); + assertEquals(ENCODED_TEXT, propertyValue); + + verify(group).addControllerService(any(ControllerServiceNode.class)); + verify(controllerServiceNode, atLeastOnce()).setName(eq(versionedService.getName())); + verify(controllerServiceNode).migrateConfiguration(propertiesCaptor.capture(), any()); + migratedProperties = propertiesCaptor.getValue(); + propertyValue = migratedProperties.get("abc"); + assertEquals("123", propertyValue); + } + @Test public void testAddProcessorWithServiceAndMigration() { final ProcessGroup processGroup = createMockProcessGroup();
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceResolver.java+4 −1 modified@@ -19,6 +19,8 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.registry.flow.FlowSnapshotContainer; +import java.util.Set; + public interface ControllerServiceResolver { /** @@ -34,7 +36,8 @@ public interface ControllerServiceResolver { * snapshots of any child groups that are also under version control * @param parentGroupId the id of the process group where the snapshot is being imported * @param user the user performing the import + * @return Any unresolved Controller Services */ - void resolveInheritedControllerServices(FlowSnapshotContainer flowSnapshotContainer, String parentGroupId, NiFiUser user); + Set<String> resolveInheritedControllerServices(FlowSnapshotContainer flowSnapshotContainer, String parentGroupId, NiFiUser user); }
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ComponentAdditions.java+151 −0 added@@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.service.ControllerServiceNode; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class ComponentAdditions { + + private Set<ProcessGroup> processGroups; + private Set<RemoteProcessGroup> remoteProcessGroups; + private Set<ProcessorNode> processors; + private Set<Port> inputPorts; + private Set<Port> outputPorts; + private Set<Connection> connections; + private Set<Label> labels; + private Set<Funnel> funnels; + private Set<ControllerServiceNode> controllerServices; + + private ComponentAdditions(final Builder builder) { + this.processGroups = Collections.unmodifiableSet(builder.processGroups); + this.remoteProcessGroups = Collections.unmodifiableSet(builder.remoteProcessGroups); + this.processors = Collections.unmodifiableSet(builder.processors); + this.inputPorts = Collections.unmodifiableSet(builder.inputPorts); + this.outputPorts = Collections.unmodifiableSet(builder.outputPorts); + this.connections = Collections.unmodifiableSet(builder.connections); + this.labels = Collections.unmodifiableSet(builder.labels); + this.funnels = Collections.unmodifiableSet(builder.funnels); + this.controllerServices = Collections.unmodifiableSet(builder.controllerServices); + } + + public Set<ProcessGroup> getProcessGroups() { + return processGroups; + } + + public Set<RemoteProcessGroup> getRemoteProcessGroups() { + return remoteProcessGroups; + } + + public Set<ProcessorNode> getProcessors() { + return processors; + } + + public Set<Port> getInputPorts() { + return inputPorts; + } + + public Set<Port> getOutputPorts() { + return outputPorts; + } + + public Set<Connection> getConnections() { + return connections; + } + + public Set<Label> getLabels() { + return labels; + } + + public Set<Funnel> getFunnels() { + return funnels; + } + + public Set<ControllerServiceNode> getControllerServices() { + return controllerServices; + } + + public static class Builder { + private Set<ProcessGroup> processGroups = new HashSet<>(); + private Set<RemoteProcessGroup> remoteProcessGroups = new HashSet<>(); + private Set<ProcessorNode> processors = new HashSet<>(); + private Set<Port> inputPorts = new HashSet<>(); + private Set<Port> outputPorts = new HashSet<>(); + private Set<Connection> connections = new HashSet<>(); + private Set<Label> labels = new HashSet<>(); + private Set<Funnel> funnels = new HashSet<>(); + private Set<ControllerServiceNode> controllerServices = new HashSet<>(); + + public Builder addProcessGroup(ProcessGroup processGroup) { + this.processGroups.add(processGroup); + return this; + } + + public Builder addRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup) { + this.remoteProcessGroups.add(remoteProcessGroup); + return this; + } + + public Builder addProcessor(ProcessorNode processor) { + this.processors.add(processor); + return this; + } + + public Builder addInputPort(Port inputPort) { + this.inputPorts.add(inputPort); + return this; + } + + public Builder addOutputPort(Port outputPort) { + this.outputPorts.add(outputPort); + return this; + } + + public Builder addConnection(Connection connection) { + this.connections.add(connection); + return this; + } + + public Builder addLabel(Label label) { + this.labels.add(label); + return this; + } + + public Builder addFunnel(Funnel funnel) { + this.funnels.add(funnel); + return this; + } + + public Builder addControllerService(ControllerServiceNode controllerService) { + this.controllerServices.add(controllerService); + return this; + } + + public ComponentAdditions build() { + return new ComponentAdditions(this); + } + } +}
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java+9 −0 modified@@ -908,6 +908,15 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi */ void move(final Snippet snippet, final ProcessGroup destination); + /** + * Adds the versioned component additions to this Process Group. + * + * @param additions the components to add + * @param componentIdSeed a seed value to use when generating ID's for new components + * @return the component additions + */ + ComponentAdditions addVersionedComponents(VersionedComponentAdditions additions, String componentIdSeed); + /** * Updates the Process Group to match the proposed flow *
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/VersionedComponentAdditions.java+179 −0 added@@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import org.apache.nifi.flow.ParameterProviderReference; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedFunnel; +import org.apache.nifi.flow.VersionedLabel; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedRemoteProcessGroup; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +public class VersionedComponentAdditions { + + private Set<VersionedProcessGroup> processGroups; + private Set<VersionedRemoteProcessGroup> remoteProcessGroups; + private Set<VersionedProcessor> processors; + private Set<VersionedPort> inputPorts; + private Set<VersionedPort> outputPorts; + private Set<VersionedConnection> connections; + private Set<VersionedLabel> labels; + private Set<VersionedFunnel> funnels; + private Set<VersionedControllerService> controllerServices; + private Map<String, VersionedParameterContext> parameterContexts; + private Map<String, ParameterProviderReference> parameterProviders; + + private VersionedComponentAdditions(final Builder builder) { + this.processGroups = builder.processGroups == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.processGroups); + this.remoteProcessGroups = builder.remoteProcessGroups == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.remoteProcessGroups); + this.processors = builder.processors == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.processors); + this.inputPorts = builder.inputPorts == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.inputPorts); + this.outputPorts = builder.outputPorts == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.outputPorts); + this.connections = builder.connections == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.connections); + this.labels = builder.labels == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.labels); + this.funnels = builder.funnels == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.funnels); + this.controllerServices = builder.controllerServices == null ? Collections.emptySet() : Collections.unmodifiableSet(builder.controllerServices); + this.parameterContexts = builder.parameterContexts == null ? Collections.emptyMap() : Collections.unmodifiableMap(builder.parameterContexts); + this.parameterProviders = builder.parameterProviders == null ? Collections.emptyMap() : Collections.unmodifiableMap(builder.parameterProviders); + } + + public Set<VersionedProcessGroup> getProcessGroups() { + return processGroups; + } + + public Set<VersionedRemoteProcessGroup> getRemoteProcessGroups() { + return remoteProcessGroups; + } + + public Set<VersionedProcessor> getProcessors() { + return processors; + } + + public Set<VersionedPort> getInputPorts() { + return inputPorts; + } + + public Set<VersionedPort> getOutputPorts() { + return outputPorts; + } + + public Set<VersionedConnection> getConnections() { + return connections; + } + + public Set<VersionedLabel> getLabels() { + return labels; + } + + public Set<VersionedFunnel> getFunnels() { + return funnels; + } + + public Set<VersionedControllerService> getControllerServices() { + return controllerServices; + } + + public Map<String, VersionedParameterContext> getParameterContexts() { + return parameterContexts; + } + + public Map<String, ParameterProviderReference> getParameterProviders() { + return parameterProviders; + } + + public static class Builder { + private Set<VersionedProcessGroup> processGroups; + private Set<VersionedRemoteProcessGroup> remoteProcessGroups; + private Set<VersionedProcessor> processors; + private Set<VersionedPort> inputPorts; + private Set<VersionedPort> outputPorts; + private Set<VersionedConnection> connections; + private Set<VersionedLabel> labels; + private Set<VersionedFunnel> funnels; + private Set<VersionedControllerService> controllerServices; + private Map<String, VersionedParameterContext> parameterContexts; + private Map<String, ParameterProviderReference> parameterProviders; + + public Builder setProcessGroups(Set<VersionedProcessGroup> processGroups) { + this.processGroups = processGroups; + return this; + } + + public Builder setRemoteProcessGroups(Set<VersionedRemoteProcessGroup> remoteProcessGroups) { + this.remoteProcessGroups = remoteProcessGroups; + return this; + } + + public Builder setProcessors(Set<VersionedProcessor> processors) { + this.processors = processors; + return this; + } + + public Builder setInputPorts(Set<VersionedPort> inputPorts) { + this.inputPorts = inputPorts; + return this; + } + + public Builder setOutputPorts(Set<VersionedPort> outputPorts) { + this.outputPorts = outputPorts; + return this; + } + + public Builder setConnections(Set<VersionedConnection> connections) { + this.connections = connections; + return this; + } + + public Builder setLabels(Set<VersionedLabel> labels) { + this.labels = labels; + return this; + } + + public Builder setFunnels(Set<VersionedFunnel> funnels) { + this.funnels = funnels; + return this; + } + + public Builder setControllerServices(Set<VersionedControllerService> controllerServices) { + this.controllerServices = controllerServices; + return this; + } + + public Builder setParameterContexts(Map<String, VersionedParameterContext> parameterContexts) { + this.parameterContexts = parameterContexts; + return this; + } + + public Builder setParameterProviders(Map<String, ParameterProviderReference> parameterProviders) { + this.parameterProviders = parameterProviders; + return this; + } + + public VersionedComponentAdditions build() { + return new VersionedComponentAdditions(this); + } + } +}
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java+7 −0 modified@@ -35,6 +35,7 @@ import org.apache.nifi.flow.ExecutionEngine; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.groups.BatchCounts; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.DataValve; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileGate; @@ -46,6 +47,7 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.StatelessGroupNode; import org.apache.nifi.groups.StatelessGroupScheduledState; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.registry.flow.FlowLocation; @@ -704,6 +706,11 @@ public void verifyCanSaveToFlowRegistry(String registryId, FlowLocation flowLoca public void synchronizeWithFlowRegistry(FlowManager flowRegistry) { } + @Override + public ComponentAdditions addVersionedComponents(VersionedComponentAdditions additions, String componentIdSeed) { + return new ComponentAdditions.Builder().build(); + } + @Override public void updateFlow(VersionedExternalFlow proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) { }
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizableLookup.java+27 −0 modified@@ -22,6 +22,9 @@ import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.web.api.dto.BundleDTO; +import java.util.Set; +import java.util.function.Predicate; + public interface AuthorizableLookup { /** @@ -134,6 +137,13 @@ public interface AuthorizableLookup { */ ConnectionAuthorizable getConnection(String id); + /** + * Get the authorizable root ProcessGroup. + * + * @return authorizable + */ + ProcessGroupAuthorizable getRootProcessGroup(); + /** * Get the authorizable ProcessGroup. * @@ -166,6 +176,15 @@ public interface AuthorizableLookup { */ Authorizable getFunnel(String id); + /** + * Get the authorizables for all controller services that meet the specified predicate. Non null + * + * @param groupId the group id + * @param filter the filter + * @return all encapsulated controller services + */ + Set<ComponentAuthorizable> getControllerServices(String groupId, Predicate<org.apache.nifi.authorization.resource.VersionedComponentAuthorizable> filter); + /** * Get the authorizable ControllerService. * @@ -199,6 +218,14 @@ public interface AuthorizableLookup { */ ComponentAuthorizable getFlowAnalysisRule(String id); + /** + * Get the authorizables for all parameter providers that meet the specified predicate. Non null + * + * @param filter predicate to filter which parameter providers should be included + * @return all parameter providers that meet the specified filter + */ + Set<ComponentAuthorizable> getParameterProviders(Predicate<org.apache.nifi.authorization.resource.ComponentAuthorizable> filter); + /** * Get the authorizable ParameterProvider. *
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeControllerServiceReference.java+37 −0 modified@@ -30,6 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; /** * Authorizes references to Controller Services. Utilizes when Processors, Controller Services, and Reporting Tasks are created and updated. @@ -164,4 +165,40 @@ private static void authorize(Authorizer authorizer, AuthorizableLookup lookup, serviceAuthorizable.authorize(authorizer, RequestAction.READ, user); }); } + + /** + * Unresolved Controller Services may be unresolved because + * - The identifier matches an existing Controller Service id (unresolved because the proposed id was unchanged from resolved id) + * - An applicable Controller Service does not exist + * - The user lacks permission to an applicable Controller Service + * + * If any of those unresolved Controller Services do match a local Controller Service we need to authorize access to it. + * + * @param groupId the group id + * @param unresolvedControllerServices the unresolved Controller Services + * @param authorizer the Authorizer + * @param lookup the AuthorizableLookup + * @param user the current user + */ + public static void authorizeUnresolvedControllerServiceReferences(final String groupId, final Set<String> unresolvedControllerServices, + final Authorizer authorizer, final AuthorizableLookup lookup, final NiFiUser user) { + + // if there are no unresolved Controller Services we can return + if (unresolvedControllerServices.isEmpty()) { + return; + } + + lookup.getControllerServices(groupId, cs -> { + // if the unresolved controller service directly contains the id of a locally available service, include it for authorization + final boolean containsId = unresolvedControllerServices.contains(cs.getIdentifier()); + if (containsId) { + return true; + } + + // if the unresolved controller service contains the versioned id of a locally available service, include it for authorization + final Optional<String> versionedId = cs.getVersionedComponentId(); + return versionedId.filter(unresolvedControllerServices::contains).isPresent(); + }) + .forEach(ca -> ca.getAuthorizable().authorize(authorizer, RequestAction.READ, user)); + } }
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterProviders.java+71 −0 added@@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.authorization; + +import org.apache.nifi.authorization.user.NiFiUser; + +import java.util.HashSet; +import java.util.Set; + +public class AuthorizeParameterProviders { + + /** + * Unresolved Parameter Providers may be unresolved because + * - The identifier matches an existing Parameter Provider id (unresolved because the proposed id was unchanged from resolved id) + * - An applicable Parameter Provider does not exist + * - The user lacks permission to an applicable Parameter Provider + * + * If any of those unresolved Parameter Providers do match a local Parameter Provider we need to authorize access to it. If + * there are unresolved Parameter Providers that do not match a local Parameter Provider we need to ensure the user has + * permissions to create one. + * + * @param unresolvedParameterProviders the unresolved Parameter Providers + * @param authorizer the Authorizer + * @param lookup the AuthorizableLookup + * @param user the current user + */ + public static void authorizeUnresolvedParameterProviders(final Set<String> unresolvedParameterProviders, final Authorizer authorizer, + final AuthorizableLookup lookup, final NiFiUser user) { + + // if there are no unresolved Parameter Providers we can return + if (unresolvedParameterProviders.isEmpty()) { + return; + } + + // unresolved parameter providers may be unresolved because + // - the identifier matches an existing parameter provider id (unresolved because proposed id was unchanged from resolved id) + // - an applicable parameter provider does not exist + // - the user lacks permission to an applicable parameter provider + // for everything unresolved, if it matches an existing parameter provider we need to authorize it. + final Set<String> unknownParameterProviders = new HashSet<>(unresolvedParameterProviders); + lookup.getParameterProviders(pp -> { + // remove this identifier from unknown/unresolved since it actually exists locally + unknownParameterProviders.remove(pp.getIdentifier()); + + // if this unresolved parameter provider matches an existing parameter provider it + // means the request payload already referenced a local parameter provider so we need + // to ensure the user has permissions to read it + return unresolvedParameterProviders.contains(pp.getIdentifier()); + }).forEach(ca -> ca.getAuthorizable().authorize(authorizer, RequestAction.READ, user)); + + // if there are remaining unknown parameter providers a new parameter provider will be created, + // and we need to ensure the user has those permissions too + if (!unknownParameterProviders.isEmpty()) { + lookup.getController().authorize(authorizer, RequestAction.WRITE, user); + } + } +}
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/ProcessGroupAuthorizable.java+27 −2 modified@@ -19,7 +19,9 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.groups.ProcessGroup; +import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; /** * Authorizable for a ProcessGroup and its encapsulated components. @@ -32,13 +34,28 @@ public interface ProcessGroupAuthorizable extends AuthorizableHolder { */ ProcessGroup getProcessGroup(); + /** + * Returns the optional Parameter Context Authorizable. + * + * @return the Parameter Context authorizable + */ + Optional<Authorizable> getParameterContextAuthorizable(); + /** * The authorizables for all encapsulated processors. Non null * * @return all encapsulated processors */ Set<ComponentAuthorizable> getEncapsulatedProcessors(); + /** + * The authorizables for all encapsulated processors that meet the specified predicate. Non null + * + * @param filter predicate to filter which processors should be included + * @return all encapsulated processors that meet the specified predicate + */ + Set<ComponentAuthorizable> getEncapsulatedProcessors(Predicate<org.apache.nifi.authorization.resource.ComponentAuthorizable> filter); + /** * The authorizables for all encapsulated connections. Non null * @@ -89,10 +106,18 @@ public interface ProcessGroupAuthorizable extends AuthorizableHolder { Set<Authorizable> getEncapsulatedRemoteProcessGroups(); /** - * The authorizables for all encapsulated input ports. Non null + * The authorizables for all encapsulated controller services. Non null * - * @return all encapsulated input ports + * @return all encapsulated controller services */ Set<ComponentAuthorizable> getEncapsulatedControllerServices(); + /** + * The authorizables for all encapsulated controller services that meet the specified predicate. Non null + * + * @param filter predicate to filter which controller services should be included + * @return all controller services that meet the specified predicate + */ + Set<ComponentAuthorizable> getEncapsulatedControllerServices(Predicate<org.apache.nifi.authorization.resource.ComponentAuthorizable> filter); + }
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/StandardAuthorizableLookup.java+67 −0 modified@@ -28,6 +28,7 @@ import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.resource.RestrictedComponentsAuthorizableFactory; import org.apache.nifi.authorization.resource.TenantAuthorizable; +import org.apache.nifi.authorization.resource.VersionedComponentAuthorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; @@ -76,7 +77,9 @@ import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; @Component @@ -294,6 +297,11 @@ public ConnectionAuthorizable getConnection(final String id) { return new StandardConnectionAuthorizable(connection); } + @Override + public ProcessGroupAuthorizable getRootProcessGroup() { + return getProcessGroup(controllerFacade.getRootGroupId()); + } + @Override public ProcessGroupAuthorizable getProcessGroup(final String id) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id); @@ -315,6 +323,39 @@ public Authorizable getFunnel(final String id) { return funnelDAO.getFunnel(id); } + @Override + public Set<ComponentAuthorizable> getControllerServices(final String groupId, final Predicate<org.apache.nifi.authorization.resource.VersionedComponentAuthorizable> filter) { + return controllerServiceDAO.getControllerServices(groupId, true, false).stream() + .filter(cs -> filter.test(new VersionedComponentAuthorizable() { + @Override + public Optional<String> getVersionedComponentId() { + return cs.getVersionedComponentId(); + } + + @Override + public String getIdentifier() { + return cs.getIdentifier(); + } + + @Override + public String getProcessGroupIdentifier() { + return cs.getProcessGroupIdentifier(); + } + + @Override + public Authorizable getParentAuthorizable() { + return cs.getParentAuthorizable(); + } + + @Override + public Resource getResource() { + return cs.getResource(); + } + }) + ) + .map(controllerServiceNode -> new ControllerServiceComponentAuthorizable(controllerServiceNode, controllerFacade.getExtensionManager())).collect(Collectors.toSet()); + } + @Override public ComponentAuthorizable getControllerService(final String id) { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id); @@ -391,6 +432,13 @@ public ComponentAuthorizable getFlowAnalysisRule(final String id) { return new FlowAnalysisRuleComponentAuthorizable(flowAnalysisRuleNode, controllerFacade.getExtensionManager()); } + @Override + public Set<ComponentAuthorizable> getParameterProviders(final Predicate<org.apache.nifi.authorization.resource.ComponentAuthorizable> filter) { + return parameterProviderDAO.getParameterProviders().stream() + .filter(filter) + .map(parameterProviderNode -> new ParameterProviderComponentAuthorizable(parameterProviderNode, controllerFacade.getExtensionManager())).collect(Collectors.toSet()); + } + @Override public ComponentAuthorizable getParameterProvider(final String id) { final ParameterProviderNode parameterProviderNode = parameterProviderDAO.getParameterProvider(id); @@ -1210,6 +1258,11 @@ public Authorizable getAuthorizable() { return processGroup; } + @Override + public Optional<Authorizable> getParameterContextAuthorizable() { + return Optional.ofNullable(processGroup.getParameterContext()); + } + @Override public ProcessGroup getProcessGroup() { return processGroup; @@ -1221,6 +1274,13 @@ public Set<ComponentAuthorizable> getEncapsulatedProcessors() { processorNode -> new ProcessorComponentAuthorizable(processorNode, extensionManager)).collect(Collectors.toSet()); } + @Override + public Set<ComponentAuthorizable> getEncapsulatedProcessors(Predicate<org.apache.nifi.authorization.resource.ComponentAuthorizable> processorFilter) { + return processGroup.findAllProcessors().stream() + .filter(processorFilter) + .map(processorNode -> new ProcessorComponentAuthorizable(processorNode, extensionManager)).collect(Collectors.toSet()); + } + @Override public Set<ConnectionAuthorizable> getEncapsulatedConnections() { return processGroup.findAllConnections().stream().map( @@ -1263,6 +1323,13 @@ public Set<ComponentAuthorizable> getEncapsulatedControllerServices() { return processGroup.findAllControllerServices().stream().map( controllerServiceNode -> new ControllerServiceComponentAuthorizable(controllerServiceNode, extensionManager)).collect(Collectors.toSet()); } + + @Override + public Set<ComponentAuthorizable> getEncapsulatedControllerServices(Predicate<org.apache.nifi.authorization.resource.ComponentAuthorizable> serviceFilter) { + return processGroup.findAllControllerServices().stream() + .filter(serviceFilter) + .map(controllerServiceNode -> new ControllerServiceComponentAuthorizable(controllerServiceNode, extensionManager)).collect(Collectors.toSet()); + } } private static class StandardConnectionAuthorizable implements ConnectionAuthorizable {
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java+30 −6 modified@@ -206,11 +206,20 @@ protected ResponseBuilder noCache(final ResponseBuilder response) { } protected String generateUuid() { + return generateUuid(null); + } + + protected String generateUuid(final String currentId) { final Optional<String> seed = getIdGenerationSeed(); UUID uuid; if (seed.isPresent()) { try { - UUID seedId = UUID.fromString(seed.get()); + final UUID seedId; + if (currentId == null) { + seedId = UUID.fromString(seed.get()); + } else { + seedId = UUID.nameUUIDFromBytes((currentId + seed.get()).getBytes(StandardCharsets.UTF_8)); + } uuid = new UUID(seedId.getMostSignificantBits(), seed.get().hashCode()); } catch (Exception e) { logger.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation."); @@ -426,19 +435,25 @@ protected void authorizeRestrictions(final Authorizer authorizer, final Componen * @param authorizeReferencedServices whether to authorize referenced services * @param authorizeControllerServices whether to authorize controller services * @param authorizeTransitiveServices whether to authorize transitive services - * @param authorizeParameterReferences whether to authorize parameter references + * @param authorizeParameterReferences whether to authorize parameter context that contained referenced parameter if applicable + * @param authorizeParameterContext whether to authorize the bound parameter context if applicable */ protected void authorizeProcessGroup(final ProcessGroupAuthorizable processGroupAuthorizable, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action, final boolean authorizeReferencedServices, final boolean authorizeControllerServices, final boolean authorizeTransitiveServices, - final boolean authorizeParameterReferences) { + final boolean authorizeParameterReferences, final boolean authorizeParameterContext) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action, user); // authorize the process group authorize.accept(processGroupAuthorizable.getAuthorizable()); + // authorize the parameter context for the specified process group + if (authorizeParameterContext) { + processGroupAuthorizable.getParameterContextAuthorizable().ifPresent(authorize); + } + // authorize the contents of the group - these methods return all encapsulated components (recursive) processGroupAuthorizable.getEncapsulatedProcessors().forEach(processorAuthorizable -> { // authorize the processor @@ -459,7 +474,15 @@ protected void authorizeProcessGroup(final ProcessGroupAuthorizable processGroup processGroupAuthorizable.getEncapsulatedOutputPorts().forEach(authorize); processGroupAuthorizable.getEncapsulatedFunnels().forEach(authorize); processGroupAuthorizable.getEncapsulatedLabels().forEach(authorize); - processGroupAuthorizable.getEncapsulatedProcessGroups().stream().map(group -> group.getAuthorizable()).forEach(authorize); + processGroupAuthorizable.getEncapsulatedProcessGroups().forEach(pga -> { + final Authorizable authorizable = pga.getAuthorizable(); + + authorize.accept(authorizable); + + if (authorizeParameterContext) { + pga.getParameterContextAuthorizable().ifPresent(authorize); + } + }); processGroupAuthorizable.getEncapsulatedRemoteProcessGroups().forEach(authorize); // authorize controller services if necessary @@ -488,7 +511,8 @@ protected void authorizeProcessGroup(final ProcessGroupAuthorizable processGroup * @param action action */ protected void authorizeSnippet(final SnippetAuthorizable snippet, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action, - final boolean authorizeReferencedServices, final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences) { + final boolean authorizeReferencedServices, final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences, + final boolean authorizeParameterContext) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action, user); @@ -498,7 +522,7 @@ protected void authorizeSnippet(final SnippetAuthorizable snippet, final Authori // note - we are not authorizing controller services as they are not considered when using this snippet. however, // referenced services are considered so those are explicitly authorized when authorizing a processor authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, action, authorizeReferencedServices, - false, authorizeTransitiveServices, authorizeParameterReferences); + false, authorizeTransitiveServices, authorizeParameterReferences, authorizeParameterContext); }); snippet.getSelectedRemoteProcessGroups().forEach(authorize); snippet.getSelectedProcessors().forEach(processorAuthorizable -> {
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java+2 −1 modified@@ -219,8 +219,9 @@ public ControllerConfigurationEntity createControllerConfigurationEntity(final C return entity; } - public ProcessGroupFlowEntity createProcessGroupFlowEntity(final ProcessGroupFlowDTO dto, final PermissionsDTO permissions) { + public ProcessGroupFlowEntity createProcessGroupFlowEntity(final ProcessGroupFlowDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) { final ProcessGroupFlowEntity entity = new ProcessGroupFlowEntity(); + entity.setRevision(revision); entity.setProcessGroupFlow(dto); entity.setPermissions(permissions); return entity;
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java+15 −6 modified@@ -17,6 +17,8 @@ package org.apache.nifi.web.api; import org.apache.nifi.authorization.AuthorizableLookup; +import org.apache.nifi.authorization.AuthorizeControllerServiceReference; +import org.apache.nifi.authorization.AuthorizeParameterProviders; import org.apache.nifi.authorization.AuthorizeParameterReference; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.ComponentAuthorizable; @@ -193,10 +195,10 @@ protected Response initiateFlowUpdate(final String groupId, final T requestEntit serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents()); // If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to. - serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, user); + final Set<String> unresolvedControllerServices = serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, user); // If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to. - serviceFacade.resolveParameterProviders(flowSnapshot, user); + final Set<String> unresolvedParameterProviders = serviceFacade.resolveParameterProviders(flowSnapshot, user); // Step 1: Determine which components will be affected by updating the flow final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByFlowUpdate(groupId, flowSnapshot); @@ -211,7 +213,7 @@ protected Response initiateFlowUpdate(final String groupId, final T requestEntit serviceFacade, requestWrapper, requestRevision, - lookup -> authorizeFlowUpdate(lookup, user, groupId, flowSnapshot), + lookup -> authorizeFlowUpdate(lookup, user, groupId, flowSnapshot, unresolvedControllerServices, unresolvedParameterProviders), () -> { // Step 3: Verify that all components in the snapshot exist on all nodes // Step 4: Verify that Process Group can be updated. Only versioned flows care about the verifyNotDirty flag @@ -230,13 +232,14 @@ protected Response initiateFlowUpdate(final String groupId, final T requestEntit * @param flowSnapshot the new flow contents to examine for restricted components */ protected void authorizeFlowUpdate(final AuthorizableLookup lookup, final NiFiUser user, final String groupId, - final RegisteredFlowSnapshot flowSnapshot) { + final RegisteredFlowSnapshot flowSnapshot, final Set<String> unresolvedControllerServices, + final Set<String> unresolvedParameterProviders) { // Step 2: Verify READ and WRITE permissions for user, for every component. final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, - false, true, true); + false, true, false, true); authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, - false, true, false); + false, true, false, false); final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents(); final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(groupContents, serviceFacade); @@ -251,6 +254,12 @@ protected void authorizeFlowUpdate(final AuthorizableLookup lookup, final NiFiUs context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user) ); } + + // authorize parameter providers + AuthorizeParameterProviders.authorizeUnresolvedParameterProviders(unresolvedParameterProviders, authorizer, lookup, user); + + // authorizer controller services + AuthorizeControllerServiceReference.authorizeUnresolvedControllerServiceReferences(groupId, unresolvedControllerServices, authorizer, lookup, user); } /**
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java+451 −14 modified@@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -60,6 +61,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizeControllerServiceReference; +import org.apache.nifi.authorization.AuthorizeParameterProviders; import org.apache.nifi.authorization.AuthorizeParameterReference; import org.apache.nifi.authorization.ComponentAuthorizable; import org.apache.nifi.authorization.ConnectionAuthorizable; @@ -72,10 +74,14 @@ import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.ExecutionEngine; +import org.apache.nifi.flow.VersionedComponent; import org.apache.nifi.flow.VersionedFlowCoordinates; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistryBucket; @@ -107,6 +113,9 @@ import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; +import org.apache.nifi.web.api.entity.PasteRequestEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.Entity; @@ -119,6 +128,7 @@ import org.apache.nifi.web.api.entity.LabelsEntity; import org.apache.nifi.web.api.entity.OutputPortsEntity; import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; @@ -310,7 +320,7 @@ public Response exportProcessGroup( // ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, - false, false, true); + false, false, false, true); }); // get the versioned flow @@ -326,6 +336,89 @@ public Response exportProcessGroup( return generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)).build(); } + /** + * Generates a copy response for the given copy request. + * + * @param groupId The id of the process group + * @param copyRequestEntity The copy request + * @return A copyResponseEntity. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/copy") + @Operation( + summary = "Generates a copy response for the given copy request", + responses = @ApiResponse(content = @Content(schema = @Schema(implementation = CopyResponseEntity.class))), + security = { + @SecurityRequirement(name = "Read - /{component-type}/{uuid} - For all encapsulated components") + } + ) + @ApiResponses( + value = { + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + } + ) + public Response copy( + @Parameter( + description = "The process group id.", + required = true + ) + @PathParam("id") final String groupId, + @Parameter( + description = "The request including the components to be copied from the specified Process Group.", + required = true + ) final CopyRequestEntity copyRequestEntity) { + + if (copyRequestEntity == null) { + throw new IllegalArgumentException("The copy request payload must be specified."); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + copyRequestEntity.getProcessors().forEach(id -> { + final Authorizable authorizable = lookup.getProcessor(id).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + copyRequestEntity.getInputPorts().forEach(id -> { + final Authorizable authorizable = lookup.getInputPort(id); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + copyRequestEntity.getOutputPorts().forEach(id -> { + final Authorizable authorizable = lookup.getOutputPort(id); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + copyRequestEntity.getProcessGroups().forEach(id -> { + final ProcessGroupAuthorizable processGroupAuthorizable = lookup.getProcessGroup(id); + authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, RequestAction.READ, true, true, false, false, true); + }); + copyRequestEntity.getRemoteProcessGroups().forEach(id -> { + final Authorizable authorizable = lookup.getRemoteProcessGroup(id); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + copyRequestEntity.getFunnels().forEach(id -> { + final Authorizable authorizable = lookup.getFunnel(id); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + copyRequestEntity.getLabels().forEach(id -> { + final Authorizable authorizable = lookup.getLabel(id); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + copyRequestEntity.getConnections().forEach(id -> { + final Authorizable authorizable = lookup.getConnection(id).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + }); + + // copy the components + final CopyResponseEntity copyResponseEntity = serviceFacade.copyComponents(groupId, copyRequestEntity); + return generateOkResponse(copyResponseEntity).build(); + } + /** * Retrieves a list of local modifications to the Process Group since it was last synchronized with the Flow Registry * @@ -360,7 +453,7 @@ public Response getLocalModifications( // authorize access serviceFacade.authorizeAccess(lookup -> { final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); - authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, false, false); + authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, false, false, false, false, false); }); final FlowComparisonEntity entity = serviceFacade.getLocalModifications(groupId); @@ -762,10 +855,10 @@ public Response removeDropRequest( private void authorizeHandleDropAllFlowFilesRequest(String processGroupId, AuthorizableLookup lookup) { final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); - authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false); + authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false); processGroup.getEncapsulatedProcessGroups() - .forEach(encapsulatedProcessGroup -> authorizeProcessGroup(encapsulatedProcessGroup, authorizer, lookup, RequestAction.READ, false, false, false, false)); + .forEach(encapsulatedProcessGroup -> authorizeProcessGroup(encapsulatedProcessGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false)); processGroup.getEncapsulatedConnections().stream() .map(ConnectionAuthorizable::getSourceData) @@ -834,7 +927,7 @@ public Response removeProcessGroup( // ensure write to this process group and all encapsulated components including controller services. additionally, ensure // read to any referenced services by encapsulated components - authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, true, false, false); + authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, true, false, false, false); // ensure write permission to the parent process group, if applicable... if this is the root group the // request will fail later but still need to handle authorization here @@ -940,6 +1033,8 @@ public Response createProcessGroup( // for write access to the RestrictedComponents resource // Step 6: Replicate the request or call serviceFacade.updateProcessGroup + final Set<String> unresolvedControllerServices = new HashSet<>(); + final Set<String> unresolvedParameterProviders = new HashSet<>(); final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. @@ -967,10 +1062,10 @@ public Response createProcessGroup( serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents()); // If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to. - serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser()); + unresolvedControllerServices.addAll(serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser())); // If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to. - serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser()); + unresolvedParameterProviders.addAll(serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser())); // Step 6: Update contents of the ProcessGroupDTO passed in to include the components that need to be added. requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot); @@ -991,7 +1086,16 @@ public Response createProcessGroup( return withWriteLock( serviceFacade, requestProcessGroupEntity, - lookup -> authorizeAccess(groupId, requestProcessGroupEntity, lookup), + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + authorizeAccess(groupId, requestProcessGroupEntity, lookup); + + // authorizer controller services + AuthorizeControllerServiceReference.authorizeUnresolvedControllerServiceReferences(groupId, unresolvedControllerServices, authorizer, lookup, user); + + // authorize parameter providers + AuthorizeParameterProviders.authorizeUnresolvedParameterProviders(unresolvedParameterProviders, authorizer, lookup, user); + }, () -> { final RegisteredFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); if (versionedFlowSnapshot != null) { @@ -2313,7 +2417,7 @@ public Response copySnippet( requestCopySnippetEntity, lookup -> { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final SnippetAuthorizable snippet = authorizeSnippetUsage(lookup, groupId, requestCopySnippetEntity.getSnippetId(), false, true); + final SnippetAuthorizable snippet = authorizeSnippetUsage(lookup, groupId, requestCopySnippetEntity.getSnippetId(), false, true, true); final Consumer<ComponentAuthorizable> authorizeRestricted = authorizable -> { if (authorizable.isRestricted()) { @@ -2367,7 +2471,8 @@ public Response copySnippet( } private SnippetAuthorizable authorizeSnippetUsage(final AuthorizableLookup lookup, final String groupId, final String snippetId, - final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences) { + final boolean authorizeTransitiveServices, final boolean authorizeParameterReferences, + final boolean authorizeParameterContext) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -2376,7 +2481,7 @@ private SnippetAuthorizable authorizeSnippetUsage(final AuthorizableLookup looku // ensure read permission to every component in the snippet including referenced services final SnippetAuthorizable snippet = lookup.getSnippet(snippetId); - authorizeSnippet(snippet, authorizer, lookup, RequestAction.READ, true, authorizeTransitiveServices, authorizeParameterReferences); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.READ, true, authorizeTransitiveServices, authorizeParameterReferences, authorizeParameterContext); return snippet; } @@ -2772,10 +2877,10 @@ public Response importProcessGroup( // if there are any Controller Services referenced that are inherited from the parent group, // resolve those to point to the appropriate Controller Service, if we are able to. final FlowSnapshotContainer flowSnapshotContainer = new FlowSnapshotContainer(versionedFlowSnapshot); - serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser()); + final Set<String> unresolvedControllerServices = serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser()); // If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to. - serviceFacade.resolveParameterProviders(versionedFlowSnapshot, NiFiUserUtils.getNiFiUser()); + final Set<String> unresolvedParameterProviders = serviceFacade.resolveParameterProviders(versionedFlowSnapshot, NiFiUserUtils.getNiFiUser()); if (isReplicateRequest()) { return replicate(HttpMethod.POST, processGroupUploadEntity); @@ -2789,7 +2894,16 @@ public Response importProcessGroup( return withWriteLock( serviceFacade, newProcessGroupEntity, - lookup -> authorizeAccess(groupId, newProcessGroupEntity, lookup), + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + authorizeAccess(groupId, newProcessGroupEntity, lookup); + + // authorizer controller services + AuthorizeControllerServiceReference.authorizeUnresolvedControllerServiceReferences(groupId, unresolvedControllerServices, authorizer, lookup, user); + + // authorize parameter providers + AuthorizeParameterProviders.authorizeUnresolvedParameterProviders(unresolvedParameterProviders, authorizer, lookup, user); + }, () -> { final RegisteredFlowSnapshot newVersionedFlowSnapshot = newProcessGroupEntity.getVersionedFlowSnapshot(); if (newVersionedFlowSnapshot != null) { @@ -2836,6 +2950,329 @@ public Response importProcessGroup( } + /** + * Pastes the specified payload into the given Process Group. + * + * @param pasteRequestEntity A PasteResponseEntity. + * @return A pasteResponseEntity. + */ + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/paste") + @Operation( + summary = "Pastes into the specified process group", + responses = @ApiResponse(content = @Content(schema = @Schema(implementation = PasteResponseEntity.class))), + security = { + @SecurityRequirement(name = "Write - /process-groups/{uuid}") + } + ) + @ApiResponses( + value = { + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + } + ) + public Response paste( + @Parameter( + description = "The process group id.", + required = true + ) + @PathParam("id") final String groupId, + @Parameter( + description = "The request including the components to be pasted into the specified Process Group.", + required = true + ) final PasteRequestEntity pasteRequestEntity) { + + // verify the payload was specified + if (pasteRequestEntity == null) { + throw new IllegalArgumentException("The paste payload must be specified."); + } + + // verify the revision is specified + if (pasteRequestEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + // verify the copy response is specified + if (pasteRequestEntity.getCopyResponse() == null) { + throw new IllegalArgumentException("The details of the copied components must be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, pasteRequestEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(pasteRequestEntity.getDisconnectedNodeAcknowledged()); + } + + final CopyResponseEntity copyResponseEntity = pasteRequestEntity.getCopyResponse(); + final VersionedProcessGroup versionedProcessGroup = getVersionedProcessGroup(copyResponseEntity); + mapVersionedIds(versionedProcessGroup, new HashMap<>(), new HashMap<>()); + + // resolve Bundle info + serviceFacade.discoverCompatibleBundles(versionedProcessGroup); + + // prep a pasted flow snapshot to attempt to resolve external services and referenced parameter providers + final RegisteredFlowSnapshot pastedFlowSnapshot = new RegisteredFlowSnapshot(); + pastedFlowSnapshot.setExternalControllerServices(copyResponseEntity.getExternalControllerServiceReferences()); + pastedFlowSnapshot.setFlowContents(versionedProcessGroup); + pastedFlowSnapshot.setParameterContexts(copyResponseEntity.getParameterContexts()); + pastedFlowSnapshot.setParameterProviders(copyResponseEntity.getParameterProviders()); + + // if there are any Controller Services referenced that are inherited from the parent group, + // resolve those to point to the appropriate Controller Service, if we are able to. + final FlowSnapshotContainer flowSnapshotContainer = new FlowSnapshotContainer(pastedFlowSnapshot); + final Set<String> unresolvedControllerServices = serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser()); + + // If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to. + final Set<String> unresolvedParameterProviders = serviceFacade.resolveParameterProviders(pastedFlowSnapshot, NiFiUserUtils.getNiFiUser()); + + final Revision requestRevision = getRevision(pasteRequestEntity.getRevision(), groupId); + return withWriteLock( + serviceFacade, + pasteRequestEntity, + requestRevision, + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // ensure the user can write to the current group + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, user); + + // if the pasted content contains restricted components, ensure the user is allowed those restrictions + final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(versionedProcessGroup, serviceFacade); + restrictedComponents.forEach(restrictedComponent -> { + final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent); + authorizeRestrictions(authorizer, restrictedComponentAuthorizable); + }); + + // authorize controller services + AuthorizeControllerServiceReference.authorizeUnresolvedControllerServiceReferences(groupId, unresolvedControllerServices, authorizer, lookup, user); + + // if the pasted content contains parameter contexts, ensure the user can create them or add to existing matching contexts + final Map<String, VersionedParameterContext> parameterContexts = copyResponseEntity.getParameterContexts(); + if (parameterContexts != null) { + parameterContexts.values().forEach(context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user)); + } + + // authorize parameter providers + AuthorizeParameterProviders.authorizeUnresolvedParameterProviders(unresolvedParameterProviders, authorizer, lookup, user); + + // if the pasted content contains instance ids, ensure the user can read those instances since sensitive values will be copied over + authorizeInstanceIds(versionedProcessGroup, lookup); + }, + () -> serviceFacade.verifyComponentTypes(versionedProcessGroup), + (revision, requestPasteRequestEntity) -> { + final CopyResponseEntity requestCopyResponseEntity = requestPasteRequestEntity.getCopyResponse(); + + // prepare the request to add versioned components + final VersionedComponentAdditions additions = new VersionedComponentAdditions.Builder() + .setProcessors(requestCopyResponseEntity.getProcessors()) + .setInputPorts(requestCopyResponseEntity.getInputPorts()) + .setOutputPorts(requestCopyResponseEntity.getOutputPorts()) + .setFunnels(requestCopyResponseEntity.getFunnels()) + .setLabels(requestCopyResponseEntity.getLabels()) + .setProcessGroups(requestCopyResponseEntity.getProcessGroups()) + .setRemoteProcessGroups(requestCopyResponseEntity.getRemoteProcessGroups()) + .setConnections(requestCopyResponseEntity.getConnections()) + .setParameterContexts(requestCopyResponseEntity.getParameterContexts()) + .setParameterProviders(requestCopyResponseEntity.getParameterProviders()) + .build(); + + final PasteResponseEntity pasteResponseEntity = serviceFacade.pasteComponents(revision, groupId, additions, getIdGenerationSeed().orElse(null)); + + // prune response as necessary + for (ProcessGroupEntity childGroupEntity : pasteResponseEntity.getFlow().getProcessGroups()) { + childGroupEntity.getComponent().setContents(null); + } + + // create the response entity + populateRemainingSnippetContent(pasteResponseEntity.getFlow()); + + return generateOkResponse(pasteResponseEntity).build(); + } + ); + } + + private static VersionedProcessGroup getVersionedProcessGroup(final CopyResponseEntity copyResponse) { + final VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup(); + versionedProcessGroup.setProcessors(new HashSet<>(copyResponse.getProcessors())); + versionedProcessGroup.setInputPorts(new HashSet<>(copyResponse.getInputPorts())); + versionedProcessGroup.setOutputPorts(new HashSet<>(copyResponse.getOutputPorts())); + versionedProcessGroup.setProcessGroups(new HashSet<>(copyResponse.getProcessGroups())); + versionedProcessGroup.setRemoteProcessGroups(new HashSet<>(copyResponse.getRemoteProcessGroups())); + versionedProcessGroup.setFunnels(new HashSet<>(copyResponse.getFunnels())); + versionedProcessGroup.setLabels(new HashSet<>(copyResponse.getLabels())); + versionedProcessGroup.setConnections(new HashSet<>(copyResponse.getConnections())); + return versionedProcessGroup; + } + + private void mapVersionedIds(final VersionedProcessGroup group, final Map<String, String> idMapping, final Map<String, String> serviceIdMapping) { + group.getControllerServices().forEach(cs -> { + final String newId = generateUuid(cs.getIdentifier()); + idMapping.put(cs.getIdentifier(), newId); + serviceIdMapping.put(cs.getIdentifier(), newId); + cs.setIdentifier(newId); + }); + group.getControllerServices().forEach(cs -> { + cs.getProperties().entrySet().stream() + .filter(propertyEntry -> { + final Map<String, VersionedPropertyDescriptor> propertyDescriptors = cs.getPropertyDescriptors(); + if (propertyDescriptors != null) { + final VersionedPropertyDescriptor propertyDescriptor = propertyDescriptors.get(propertyEntry.getKey()); + if (propertyDescriptor != null && propertyDescriptor.getIdentifiesControllerService()) { + return serviceIdMapping.containsKey(propertyEntry.getValue()); + } + } + + return false; + }) + .forEach(serviceEntry -> serviceEntry.setValue(serviceIdMapping.get(serviceEntry.getValue()))); + }); + group.getProcessors().forEach(p -> { + final String newId = generateUuid(p.getIdentifier()); + idMapping.put(p.getIdentifier(), newId); + p.setIdentifier(newId); + + p.getProperties().entrySet().stream() + .filter(propertyEntry -> { + final Map<String, VersionedPropertyDescriptor> propertyDescriptors = p.getPropertyDescriptors(); + if (propertyDescriptors != null) { + final VersionedPropertyDescriptor propertyDescriptor = propertyDescriptors.get(propertyEntry.getKey()); + if (propertyDescriptor != null && propertyDescriptor.getIdentifiesControllerService()) { + return serviceIdMapping.containsKey(propertyEntry.getValue()); + } + } + + return false; + }) + .forEach(serviceEntry -> serviceEntry.setValue(serviceIdMapping.get(serviceEntry.getValue()))); + }); + group.getInputPorts().forEach(ip -> { + final String newId = generateUuid(ip.getIdentifier()); + idMapping.put(ip.getIdentifier(), newId); + ip.setIdentifier(newId); + }); + group.getOutputPorts().forEach(op -> { + final String newId = generateUuid(op.getIdentifier()); + idMapping.put(op.getIdentifier(), newId); + op.setIdentifier(newId); + }); + group.getFunnels().forEach(f -> { + final String newId = generateUuid(f.getIdentifier()); + idMapping.put(f.getIdentifier(), newId); + f.setIdentifier(newId); + }); + group.getLabels().forEach(l -> { + final String newId = generateUuid(l.getIdentifier()); + idMapping.put(l.getIdentifier(), newId); + l.setIdentifier(newId); + }); + group.getRemoteProcessGroups().forEach(rpg -> { + final String newId = generateUuid(rpg.getIdentifier()); + idMapping.put(rpg.getIdentifier(), newId); + rpg.setIdentifier(newId); + + if (rpg.getInputPorts() != null) { + rpg.getInputPorts().forEach(rip -> { + final String newRipId = generateUuid(rip.getIdentifier()); + idMapping.put(rip.getIdentifier(), newRipId); + rip.setIdentifier(newRipId); + }); + } + if (rpg.getOutputPorts() != null) { + rpg.getOutputPorts().forEach(rop -> { + final String newRopId = generateUuid(rop.getIdentifier()); + idMapping.put(rop.getIdentifier(), newRopId); + rop.setIdentifier(newRopId); + }); + } + }); + group.getProcessGroups().forEach(cpg -> { + final String newGroupId = generateUuid(cpg.getIdentifier()); + idMapping.put(cpg.getIdentifier(), newGroupId); + cpg.setIdentifier(newGroupId); + + if (cpg.getVersionedFlowCoordinates() == null) { + mapVersionedIds(cpg, idMapping, serviceIdMapping); + } + }); + group.getConnections().forEach(c -> { + final String newId = generateUuid(c.getIdentifier()); + idMapping.put(c.getIdentifier(), newId); + c.setIdentifier(newId); + + if (c.getSource() != null) { + final ConnectableComponent source = c.getSource(); + + // map the source id + final String sourceId = source.getId(); + final String newSourceId = idMapping.get(sourceId); + if (newSourceId != null) { + source.setId(newSourceId); + } + + // map the source group id + final String sourceGroupId = source.getGroupId(); + final String newSourceGroupId = idMapping.get(sourceGroupId); + if (newSourceGroupId != null) { + source.setGroupId(newSourceGroupId); + } + } + if (c.getDestination() != null) { + final ConnectableComponent destination = c.getDestination(); + + // map the destination id + final String destinationId = destination.getId(); + final String newDestinationId = idMapping.get(destinationId); + if (newDestinationId != null) { + destination.setId(newDestinationId); + } + + // map the destination group id + final String destinationGroupId = destination.getGroupId(); + final String newDestinationGroupId = idMapping.get(destinationGroupId); + if (newDestinationGroupId != null) { + destination.setGroupId(newDestinationGroupId); + } + } + }); + } + + /** + * For the specified versioned process group, identify any versioned processors or services that contain an + * instance id. If that instance id, identifies a local processor or service, ensure the user has permissions + * to READ the local instance. This is needed because sensitive properties from the local processor or service + * will be copied into the new components as part of the paste action. + * + * @param group the versioned group + * @param lookup the authorizable lookup + */ + private void authorizeInstanceIds(final VersionedProcessGroup group, final AuthorizableLookup lookup) { + final Set<String> processorInstanceIds = group.getProcessors().stream() + .map(VersionedComponent::getInstanceIdentifier) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + lookup.getRootProcessGroup().getEncapsulatedProcessors(ca -> processorInstanceIds.contains(ca.getIdentifier())).forEach(ca -> { + ca.getAuthorizable().authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final Set<String> serviceInstanceIds = group.getControllerServices().stream() + .map(VersionedComponent::getInstanceIdentifier) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + lookup.getRootProcessGroup().getEncapsulatedControllerServices(ca -> serviceInstanceIds.contains(ca.getIdentifier())).forEach(ca -> { + ca.getAuthorizable().authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + group.getProcessGroups().forEach(cpg -> { + authorizeInstanceIds(cpg, lookup); + }); + } /** * Replace the Process Group contents with the given ID with the specified Process Group contents.
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java+1 −1 modified@@ -885,7 +885,7 @@ public Response updateRemoteProcessGroupRunStatuses( lookup -> { final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); - authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false); + authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false); Set<Authorizable> remoteProcessGroups = processGroup.getEncapsulatedRemoteProcessGroups(); for (Authorizable remoteProcessGroup : remoteProcessGroups) {
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java+3 −3 modified@@ -115,7 +115,7 @@ private void authorizeSnippetRequest(final SnippetDTO snippetRequest, final Auth snippetRequest.getProcessGroups().keySet().stream().map(id -> lookup.getProcessGroup(id)).forEach(processGroupAuthorizable -> { // we are not checking referenced services since we do not know how this snippet will be used. these checks should be performed // in a subsequent action with this snippet - authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, action, false, false, false, false); + authorizeProcessGroup(processGroupAuthorizable, authorizer, lookup, action, false, false, false, false, false); }); snippetRequest.getRemoteProcessGroups().keySet().stream().map(id -> lookup.getRemoteProcessGroup(id)).forEach(authorize); snippetRequest.getProcessors().keySet().stream().map(id -> lookup.getProcessor(id).getAuthorizable()).forEach(authorize); @@ -282,7 +282,7 @@ public Response updateSnippet( final SnippetAuthorizable snippet = lookup.getSnippet(snippetId); // Note: we are explicitly not authorizing parameter references here because they are being authorized below - authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE, false, false, false); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE, false, false, false, false); final ProcessGroup destinationGroup = lookup.getProcessGroup(requestSnippetDTO.getParentGroupId()).getProcessGroup(); @@ -356,7 +356,7 @@ public Response deleteSnippet( lookup -> { // ensure write permission to every component in the snippet excluding referenced services final SnippetAuthorizable snippet = lookup.getSnippet(snippetId); - authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE, true, false, false); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE, true, false, false, false); // ensure write permission to the parent process group snippet.getParentProcessGroup().getAuthorizable().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java+5 −5 modified@@ -172,7 +172,7 @@ public Response exportFlowVersion(@Parameter(description = "The process group id final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); // ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, - false, false, true); + false, false, false, true); }); // get the versioned flow @@ -594,7 +594,7 @@ public Response saveToFlowRegistry( processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); // require read to this group and all descendants - authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true); + authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, false, true); }, () -> { final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow(); @@ -1179,10 +1179,10 @@ public Response initiateRevertFlowVersion(@Parameter(description = "The process serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents()); // If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to. - serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser()); + final Set<String> unresolvedControllerServices = serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser()); // If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to. - serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser()); + final Set<String> unresolvedParameterProviders = serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser()); // Step 1: Determine which components will be affected by updating the version final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByFlowUpdate(groupId, flowSnapshot); @@ -1197,7 +1197,7 @@ public Response initiateRevertFlowVersion(@Parameter(description = "The process serviceFacade, requestWrapper, requestRevision, - lookup -> authorizeFlowUpdate(lookup, user, groupId, flowSnapshot), + lookup -> authorizeFlowUpdate(lookup, user, groupId, flowSnapshot, unresolvedControllerServices, unresolvedParameterProviders), () -> { // Step 3: Verify that all components in the snapshot exist on all nodes // Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java+13 −0 modified@@ -31,10 +31,12 @@ import org.apache.nifi.flow.ExecutionEngine; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.registry.flow.FlowRegistryClientNode; import org.apache.nifi.registry.flow.StandardVersionControlInformation; @@ -510,6 +512,17 @@ public ProcessGroup disconnectVersionControl(final String groupId) { return group; } + @Override + public ComponentAdditions addVersionedComponents(final String groupId, final VersionedComponentAdditions additions, final String componentIdSeed) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + final ComponentAdditions componentAdditions = group.addVersionedComponents(additions, componentIdSeed); + group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize); + + group.onComponentModified(); + + return componentAdditions; + } + @Override public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedExternalFlow proposedSnapshot, final VersionControlInformationDTO versionControlInformation, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java+12 −0 modified@@ -20,7 +20,9 @@ import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.entity.ProcessGroupRecursivity; @@ -149,6 +151,16 @@ public interface ProcessGroupDAO { ProcessGroup updateProcessGroupFlow(String groupId, VersionedExternalFlow proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows); + /** + * Adds versioned components to a Process Group + * + * @param groupId the ID of the process group + * @param additions the additions to add to this Process Group + * @param componentIdSeed the seed value to use for generating ID's for new components + * @return the component additions + */ + ComponentAdditions addVersionedComponents(String groupId, VersionedComponentAdditions additions, String componentIdSeed); + /** * Applies the given Version Control Information to the Process Group *
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java+28 −2 modified@@ -38,6 +38,7 @@ import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterGroupConfiguration; import org.apache.nifi.registry.flow.FlowLocation; @@ -108,6 +109,8 @@ import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity; import org.apache.nifi.web.api.entity.FlowAnalysisRuleEntity; @@ -126,6 +129,7 @@ import org.apache.nifi.web.api.entity.ParameterContextEntity; import org.apache.nifi.web.api.entity.ParameterProviderEntity; import org.apache.nifi.web.api.entity.ParameterProviderReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; @@ -1663,6 +1667,15 @@ VersionControlInformationEntity setVersionControlInformation(Revision processGro */ FlowSnapshotContainer getVersionedFlowSnapshotByGroupId(String processGroupId); + /** + * Copies the requested components from the specified Process Group. + * + * @param groupId the group id + * @param copyRequest the copy request + * @return the copy response + */ + CopyResponseEntity copyComponents(String groupId, CopyRequestEntity copyRequest); + /** * Get the current state of the Process Group with the given ID, converted to a Versioned Flow Snapshot * @@ -1762,6 +1775,17 @@ VersionControlInformationEntity setVersionControlInformation(Revision processGro */ void verifyCanRevertLocalModifications(String groupId, RegisteredFlowSnapshot versionedFlowSnapshot); + /** + * Adds versioned components to the specified Process Group + * + * @param revision the revision of the Process Group + * @param groupId the ID of the Process Group + * @param additions the components to add + * @param componentIdSeed the seed to use for generating new component ID's + * @return the Paste response entity + */ + PasteResponseEntity pasteComponents(Revision revision, String groupId, VersionedComponentAdditions additions, String componentIdSeed); + /** * Updates the Process group with the given ID to match the new snapshot * @@ -2755,17 +2779,19 @@ ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingC * @param flowSnapshotContainer the flow snapshot container * @param parentGroupId the ID of the Process Group from which the Controller Services are inherited * @param user the NiFi user on whose behalf the request is happening; this user is used for validation so that only the Controller Services that the user has READ permissions to are included + * @return Any unresolved controller services */ - void resolveInheritedControllerServices(FlowSnapshotContainer flowSnapshotContainer, String parentGroupId, NiFiUser user); + Set<String> resolveInheritedControllerServices(FlowSnapshotContainer flowSnapshotContainer, String parentGroupId, NiFiUser user); /** * For any Parameter Provider that is found in the given Versioned Process Group, attempts to find an existing Parameter Provider that matches the definition. If any is found, * the Parameter Context within the Versioned Process Group is updated to point to the existing Parameter Provider. * * @param versionedFlowSnapshot the flow snapshot * @param user the NiFi user on whose behalf the request is happening; this user is used for validation so that only the Parameter Providers that the user has READ permissions to are included + * @return any unresolved parameter provider ids */ - void resolveParameterProviders(RegisteredFlowSnapshot versionedFlowSnapshot, NiFiUser user); + Set<String> resolveParameterProviders(RegisteredFlowSnapshot versionedFlowSnapshot, NiFiUser user); /** * @param type the component type
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java+283 −40 modified@@ -125,15 +125,22 @@ import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedExternalFlowMetadata; import org.apache.nifi.flow.VersionedFlowCoordinates; +import org.apache.nifi.flow.VersionedFunnel; +import org.apache.nifi.flow.VersionedLabel; import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.apache.nifi.flow.VersionedRemoteProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; @@ -190,11 +197,14 @@ import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; +import org.apache.nifi.registry.flow.mapping.ComponentIdLookup; +import org.apache.nifi.registry.flow.mapping.FlowMappingOptions; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; @@ -323,6 +333,8 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; import org.apache.nifi.web.api.entity.CurrentUserEntity; import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity; import org.apache.nifi.web.api.entity.FlowAnalysisRuleEntity; @@ -343,6 +355,7 @@ import org.apache.nifi.web.api.entity.ParameterProviderEntity; import org.apache.nifi.web.api.entity.ParameterProviderReferencingComponentEntity; import org.apache.nifi.web.api.entity.ParameterProviderReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.PortStatusEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; @@ -426,6 +439,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -2902,40 +2916,6 @@ private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippe // validate the new snippet validateSnippetContents(snippet); - // identify all components added - final Set<String> identifiers = new HashSet<>(); - snippet.getProcessors().stream() - .map(proc -> proc.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getConnections().stream() - .map(conn -> conn.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getInputPorts().stream() - .map(port -> port.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getOutputPorts().stream() - .map(port -> port.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getProcessGroups().stream() - .map(group -> group.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .map(remoteGroup -> remoteGroup.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null) - .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream()) - .map(remoteInputPort -> remoteInputPort.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getRemoteProcessGroups().stream() - .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null) - .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream()) - .map(remoteOutputPort -> remoteOutputPort.getId()) - .forEach(id -> identifiers.add(id)); - snippet.getLabels().stream() - .map(label -> label.getId()) - .forEach(id -> identifiers.add(id)); - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins); @@ -4151,20 +4131,36 @@ public void discoverCompatibleBundles(final VersionedReportingTaskSnapshot repor } @Override - public void resolveParameterProviders(final RegisteredFlowSnapshot versionedFlowSnapshot, final NiFiUser user) { + public Set<String> resolveParameterProviders(final RegisteredFlowSnapshot versionedFlowSnapshot, final NiFiUser user) { final Map<String, ParameterProviderReference> parameterProviderReferences = versionedFlowSnapshot.getParameterProviders(); if (parameterProviderReferences == null || parameterProviderReferences.isEmpty() || versionedFlowSnapshot.getParameterContexts() == null || versionedFlowSnapshot.getParameterContexts().isEmpty()) { - return; + return Collections.emptySet(); } final Set<ParameterProviderNode> parameterProviderNodes = parameterProviderDAO.getParameterProviders().stream() .filter(provider -> provider.isAuthorized(authorizer, RequestAction.READ, user)) .collect(Collectors.toSet()); + final Set<String> unresolvedParameterProviderIds = new HashSet<>(); for (final VersionedParameterContext parameterContext : versionedFlowSnapshot.getParameterContexts().values()) { - resolveParameterProvider(parameterContext, parameterProviderNodes, parameterProviderReferences); + final String proposedParameterProviderId = parameterContext.getParameterProvider(); + + if (proposedParameterProviderId != null) { + // attempt to resolve the parameter provider + resolveParameterProvider(parameterContext, parameterProviderNodes, parameterProviderReferences); + + // if the parameter provider is unchanged it means that the referenced provider is not in + // parameter provider nodes because it doesn't exist or the user does not have access to + // it. it could also be unchanged if the id already matches an available provider. + final String resolvedParameterProviderId = parameterContext.getParameterProvider(); + if (proposedParameterProviderId.equals(resolvedParameterProviderId)) { + unresolvedParameterProviderIds.add(proposedParameterProviderId); + } + } } + + return unresolvedParameterProviderIds; } private void resolveParameterProvider(final VersionedParameterContext parameterContext, final Set<ParameterProviderNode> availableParameterProviders, @@ -4201,8 +4197,8 @@ private void resolveParameterProvider(final VersionedParameterContext parameterC } @Override - public void resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final String processGroupId, final NiFiUser user) { - controllerFacade.getControllerServiceResolver().resolveInheritedControllerServices(flowSnapshotContainer, processGroupId, user); + public Set<String> resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final String processGroupId, final NiFiUser user) { + return controllerFacade.getControllerServiceResolver().resolveInheritedControllerServices(flowSnapshotContainer, processGroupId, user); } @Override @@ -4935,8 +4931,10 @@ public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId, final bo // has a Processor, we don't care about the individual stats of that Processor because the ProcessGroupFlowEntity // doesn't include that anyway. So we can avoid including the information in the status that is returned. final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId, 1); + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); - return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins, uiOnly), permissions); + return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, + revisionManager, this::getProcessGroupBulletins, uiOnly), revision, permissions); } @Override @@ -5242,6 +5240,108 @@ private VersionControlComponentMappingEntity createVersionControlComponentMappin return entity; } + @Override + public CopyResponseEntity copyComponents(final String groupId, final CopyRequestEntity copyRequest) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + + final FlowMappingOptions mappingOptions = new FlowMappingOptions.Builder() + .sensitiveValueEncryptor(null) + .stateLookup(VersionedComponentStateLookup.ENABLED_OR_DISABLED) + .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE) + .mapPropertyDescriptors(true) + .mapSensitiveConfiguration(false) + .mapInstanceIdentifiers(true) + .mapControllerServiceReferencesToVersionedId(true) + .mapFlowRegistryClientId(true) + .mapAssetReferences(false) + .build(); + + final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager(), mappingOptions); + final InstantiatedVersionedProcessGroup versionedProcessGroup = + mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), controllerFacade.getFlowManager(), true); + + final Set<VersionedProcessGroup> versionedProcessGroups = versionedProcessGroup.getProcessGroups().stream() + .filter(pg -> copyRequest.getProcessGroups().contains(pg.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedRemoteProcessGroup> versionedRemoteProcessGroups = versionedProcessGroup.getRemoteProcessGroups().stream() + .filter(rpg -> copyRequest.getRemoteProcessGroups().contains(rpg.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedProcessor> versionedProcessors = versionedProcessGroup.getProcessors().stream() + .filter(p -> copyRequest.getProcessors().contains(p.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedPort> versionedInputPorts = versionedProcessGroup.getInputPorts().stream() + .filter(ip -> copyRequest.getInputPorts().contains(ip.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedPort> versionedOutputPorts = versionedProcessGroup.getOutputPorts().stream() + .filter(op -> copyRequest.getOutputPorts().contains(op.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedFunnel> versionedFunnels = versionedProcessGroup.getFunnels().stream() + .filter(f -> copyRequest.getFunnels().contains(f.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedLabel> versionedLabels = versionedProcessGroup.getLabels().stream() + .filter(l -> copyRequest.getLabels().contains(l.getInstanceIdentifier())) + .collect(Collectors.toSet()); + final Set<VersionedConnection> versionedConnections = versionedProcessGroup.getConnections().stream() + .filter(c -> copyRequest.getConnections().contains(c.getInstanceIdentifier())) + .collect(Collectors.toSet()); + + // include any top level services as external as the top level isn't included + final Map<String, ExternalControllerServiceReference> externalControllerServices = + versionedProcessGroup.getExternalControllerServiceReferences().entrySet().stream() + .filter(e -> isServiceReferenced(e.getKey(), versionedProcessors, Collections.emptySet(), versionedProcessGroups)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // move any service at the current level into external services + versionedProcessGroup.getControllerServices().stream() + .filter(cs -> isServiceReferenced(cs.getIdentifier(), versionedProcessors, Collections.emptySet(), versionedProcessGroups)) + .forEach(vcs -> { + final ExternalControllerServiceReference externalControllerService = new ExternalControllerServiceReference(); + externalControllerService.setIdentifier(vcs.getIdentifier()); + externalControllerService.setName(vcs.getName()); + externalControllerServices.put(vcs.getIdentifier(), externalControllerService); + }); + + final Map<String, VersionedParameterContext> parameterContexts = new HashMap<>(); + final Map<String, ParameterProviderReference> parameterProviderReferences = new HashMap<>(); + + // Create a complete (include descendant flows) map of parameter contexts + processGroup.getProcessGroups().stream() + .filter(pg -> copyRequest.getProcessGroups().contains(pg.getIdentifier())) + .forEach(pg -> parameterContexts.putAll(mapper.mapParameterContexts(pg, true, parameterProviderReferences))); + + // build the copy response payload + final CopyResponseEntity copyResponseEntity = new CopyResponseEntity(); + copyResponseEntity.setId(UUID.randomUUID().toString()); + copyResponseEntity.setExternalControllerServiceReferences(externalControllerServices); + copyResponseEntity.setParameterContexts(parameterContexts); + copyResponseEntity.setParameterProviders(parameterProviderReferences); + copyResponseEntity.setProcessGroups(versionedProcessGroups); + copyResponseEntity.setRemoteProcessGroups(versionedRemoteProcessGroups); + copyResponseEntity.setProcessors(versionedProcessors); + copyResponseEntity.setInputPorts(versionedInputPorts); + copyResponseEntity.setOutputPorts(versionedOutputPorts); + copyResponseEntity.setFunnels(versionedFunnels); + copyResponseEntity.setLabels(versionedLabels); + copyResponseEntity.setConnections(versionedConnections); + + return copyResponseEntity; + } + + private boolean isServiceReferenced(final String serviceId, final Set<VersionedProcessor> processors, + final Set<VersionedControllerService> services, final Set<VersionedProcessGroup> groups) { + final boolean usedInProcessor = processors.stream().anyMatch(p -> p.getProperties().containsValue(serviceId)); + if (usedInProcessor) { + return true; + } + + final boolean usedInService = services.stream().anyMatch(cs -> cs.getProperties().containsValue(serviceId)); + if (usedInService) { + return true; + } + + return groups.stream().anyMatch(pg -> isServiceReferenced(serviceId, pg.getProcessors(), pg.getControllerServices(), pg.getProcessGroups())); + } + @Override public RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupId(final String processGroupId) { return getCurrentFlowSnapshotByGroupId(processGroupId, false); @@ -6102,6 +6202,138 @@ private List<Revision> getComponentRevisions(final ProcessGroup processGroup, fi return revisions; } + /** + * For each versioned processor, if there is an instance id and that instance exists locally, + * all sensitive properties from the local instance is copied into the versioned processor. + * + * @param processors the versioned processors to consider + */ + private void copySensitiveProcessorProperties(final Set<VersionedProcessor> processors) { + final FlowManager flowManager = controllerFacade.getFlowManager(); + + processors.forEach(p -> { + if (p.getInstanceIdentifier() != null) { + final ProcessorNode copiedInstance = flowManager.getProcessorNode(p.getInstanceIdentifier()); + if (copiedInstance != null) { + copiedInstance.getProperties().keySet().stream() + .filter(PropertyDescriptor::isSensitive) + .forEach(pd -> { + p.getProperties().put(pd.getName(), copiedInstance.getRawPropertyValue(pd)); + }); + } + } + }); + } + + /** + * For each versioned service, if there is an instance id and that instance exists locally, + * all sensitive properties from the local instance is copied into the versioned service. + * + * @param services the versioned services to consider + */ + private void copySensitiveServiceProperties(final Set<VersionedControllerService> services) { + final FlowManager flowManager = controllerFacade.getFlowManager(); + + services.forEach(s -> { + if (s.getInstanceIdentifier() != null) { + final ControllerServiceNode copiedInstance = flowManager.getControllerServiceNode(s.getInstanceIdentifier()); + if (copiedInstance != null) { + copiedInstance.getProperties().keySet().stream() + .filter(PropertyDescriptor::isSensitive) + .forEach(pd -> { + s.getProperties().put(pd.getName(), copiedInstance.getRawPropertyValue(pd)); + }); + } + } + }); + } + + /** + * For each versioned group, all versioned processors and services will attempt to copy sensitive + * properties from a local instance, if possible. + * + * @param groups the versioned groups to consider + */ + private void copySensitiveDescendantProperties(final Set<VersionedProcessGroup> groups) { + groups.forEach(pg -> { + copySensitiveServiceProperties(pg.getControllerServices()); + copySensitiveProcessorProperties(pg.getProcessors()); + copySensitiveDescendantProperties(pg.getProcessGroups()); + }); + } + + @Override + public PasteResponseEntity pasteComponents(final Revision revision, final String groupId, final VersionedComponentAdditions additions, final String componentIdSeed) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final RevisionUpdate<FlowSnippetDTO> snapshot = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, () -> { + // preprocess the additions and copy over any sensitive properties + copySensitiveServiceProperties(additions.getControllerServices()); + copySensitiveProcessorProperties(additions.getProcessors()); + copySensitiveDescendantProperties(additions.getProcessGroups()); + + // add the versioned components + final ComponentAdditions componentAdditions = processGroupDAO.addVersionedComponents(groupId, additions, componentIdSeed); + + // save + controllerFacade.save(); + + // gather details for response + final Set<ControllerServiceDTO> services = componentAdditions.getControllerServices().stream() + .map(s -> dtoFactory.createControllerServiceDto(s)) + .collect(Collectors.toSet()); + final Set<ProcessorDTO> processors = componentAdditions.getProcessors().stream() + .map(p -> dtoFactory.createProcessorDto(p)) + .collect(Collectors.toSet()); + final Set<PortDTO> inputPorts = componentAdditions.getInputPorts().stream() + .map(ip -> dtoFactory.createPortDto(ip)) + .collect(Collectors.toSet()); + final Set<PortDTO> outputPorts = componentAdditions.getOutputPorts().stream() + .map(op -> dtoFactory.createPortDto(op)) + .collect(Collectors.toSet()); + final Set<FunnelDTO> funnels = componentAdditions.getFunnels().stream() + .map(f -> dtoFactory.createFunnelDto(f)) + .collect(Collectors.toSet()); + final Set<LabelDTO> labels = componentAdditions.getLabels().stream() + .map(l -> dtoFactory.createLabelDto(l)) + .collect(Collectors.toSet()); + final Set<RemoteProcessGroupDTO> remoteProcessGroups = componentAdditions.getRemoteProcessGroups().stream() + .map(rpg -> dtoFactory.createRemoteProcessGroupDto(rpg)) + .collect(Collectors.toSet()); + final Set<ProcessGroupDTO> processGroups = componentAdditions.getProcessGroups().stream() + .map(pg -> dtoFactory.createProcessGroupDto(pg)) + .collect(Collectors.toSet()); + final Set<ConnectionDTO> connections = componentAdditions.getConnections().stream() + .map(c -> dtoFactory.createConnectionDto(c)) + .collect(Collectors.toSet()); + + // return the details using a flow snippet dto + final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO(); + flowSnippetDTO.setControllerServices(services); + flowSnippetDTO.setProcessors(processors); + flowSnippetDTO.setInputPorts(inputPorts); + flowSnippetDTO.setOutputPorts(outputPorts); + flowSnippetDTO.setFunnels(funnels); + flowSnippetDTO.setLabels(labels); + flowSnippetDTO.setRemoteProcessGroups(remoteProcessGroups); + flowSnippetDTO.setProcessGroups(processGroups); + flowSnippetDTO.setConnections(connections); + + final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); + final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); + return new StandardRevisionUpdate<>(flowSnippetDTO, lastModification); + }); + + // post process new flow snippet + final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snapshot.getComponent()); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + + final PasteResponseEntity pasteEntity = new PasteResponseEntity(); + pasteEntity.setFlow(flowDto); + pasteEntity.setRevision(updatedRevision); + return pasteEntity; + } + @Override public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, final RegisteredFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, @@ -6957,6 +7189,17 @@ protected NiFiRegistryFlowMapper makeNiFiRegistryFlowMapper(final ExtensionManag return new NiFiRegistryFlowMapper(extensionManager); } + /** + * Create a new flow mapper using a mockable method for testing + * + * @param extensionManager the extension manager to create the flow mapper with + * @param options the flow mapping options + * @return a new NiFiRegistryFlowMapper instance + */ + protected NiFiRegistryFlowMapper makeNiFiRegistryFlowMapper(final ExtensionManager extensionManager, final FlowMappingOptions options) { + return new NiFiRegistryFlowMapper(extensionManager, options); + } + @Autowired public void setProperties(final NiFiProperties properties) { this.properties = properties;
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java+155 −0 modified@@ -33,9 +33,13 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserDetails; import org.apache.nifi.authorization.user.StandardNiFiUser.Builder; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.PropertyConfiguration; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -44,24 +48,31 @@ import org.apache.nifi.flow.ParameterProviderReference; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedPropertyDescriptor; import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.flowanalysis.EnforcementPolicy; +import org.apache.nifi.groups.ComponentAdditions; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.registry.flow.FlowRegistryUtil; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.mapping.FlowMappingOptions; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinFactory; import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.reporting.UserAwareEventAccess; +import org.apache.nifi.services.FlowService; import org.apache.nifi.util.MockBulletinRepository; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.validation.RuleViolation; import org.apache.nifi.validation.RuleViolationsManager; import org.apache.nifi.web.api.dto.DtoFactory; @@ -72,6 +83,8 @@ import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.ActionEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.TenantEntity; @@ -81,6 +94,7 @@ import org.apache.nifi.web.dao.RemoteProcessGroupDAO; import org.apache.nifi.web.dao.UserDAO; import org.apache.nifi.web.dao.UserGroupDAO; +import org.apache.nifi.web.revision.NaiveRevisionManager; import org.apache.nifi.web.revision.RevisionManager; import org.apache.nifi.web.revision.RevisionUpdate; import org.apache.nifi.web.revision.StandardRevisionUpdate; @@ -106,6 +120,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -118,10 +133,14 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.same; import static org.mockito.Mockito.spy; @@ -242,9 +261,23 @@ public Resource getResource() { when(flowController.getResource()).thenCallRealMethod(); when(flowController.getParentAuthorizable()).thenCallRealMethod(); + final UserAwareEventAccess eventAccess = mock(UserAwareEventAccess.class); + when(flowController.getEventAccess()).thenReturn(eventAccess); + when(eventAccess.getGroupStatus(anyString(), any(NiFiUser.class), anyInt())).thenReturn(mock(ProcessGroupStatus.class)); + + // props + final NiFiProperties properties = mock(NiFiProperties.class); + when(properties.getFlowServiceWriteDelay()).thenReturn("0 sec"); + + // flow service + final FlowService flowService = mock(FlowService.class); + doNothing().when(flowService).saveFlowChanges(any(TimeUnit.class), anyLong()); + // controller facade final ControllerFacade controllerFacade = new ControllerFacade(); controllerFacade.setFlowController(flowController); + controllerFacade.setProperties(properties); + controllerFacade.setFlowService(flowService); processGroupDAO = mock(ProcessGroupDAO.class, Answers.RETURNS_DEEP_STUBS); ruleViolationsManager = mock(RuleViolationsManager.class); @@ -380,6 +413,128 @@ public void testGetActionsForUser2() { }); } + @Test + public void testCopyComponents() { + final String groupId = UUID.randomUUID().toString(); + final ProcessGroup processGroup = mock(ProcessGroup.class); + + when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup); + + final FlowManager flowManager = mock(FlowManager.class); + final ExtensionManager extensionManager = mock(ExtensionManager.class); + when(flowController.getFlowManager()).thenReturn(flowManager); + when(flowController.getExtensionManager()).thenReturn(extensionManager); + + final ControllerServiceProvider controllerServiceProvider = mock(ControllerServiceProvider.class); + when(flowController.getControllerServiceProvider()).thenReturn(controllerServiceProvider); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + + // use spy to mock the make() method for generating a new flow mapper to make this testable + final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade); + final NiFiRegistryFlowMapper flowMapper = mock(NiFiRegistryFlowMapper.class); + doReturn(flowMapper).when(serviceFacadeSpy).makeNiFiRegistryFlowMapper(eq(extensionManager), any(FlowMappingOptions.class)); + + final InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mock(InstantiatedVersionedProcessGroup.class); + when(flowMapper.mapProcessGroup(processGroup, controllerServiceProvider, flowManager, true)).thenReturn(nonVersionedProcessGroup); + + final String controllerServiceId = "controllerServiceId"; + final String processorOneId = "processorOneId"; + final String processorTwoId = "processorTwoId"; + final VersionedProcessor one = mock(VersionedProcessor.class); + when(one.getInstanceIdentifier()).thenReturn(processorOneId); + final VersionedProcessor two = mock(VersionedProcessor.class); + when(two.getInstanceIdentifier()).thenReturn(processorTwoId); + when(two.getProperties()).thenReturn(Map.of("CS Property", controllerServiceId)); + + final Set<VersionedProcessor> versionedProcessors = Set.of(one, two); + when(nonVersionedProcessGroup.getProcessors()).thenReturn(versionedProcessors); + + final ExternalControllerServiceReference externalControllerServiceReference = mock(ExternalControllerServiceReference.class); + final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = new LinkedHashMap<>(); + externalControllerServiceReferences.put(controllerServiceId, externalControllerServiceReference); + when(nonVersionedProcessGroup.getExternalControllerServiceReferences()).thenReturn(externalControllerServiceReferences); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessors(Set.of(processorOneId)); + CopyResponseEntity copyResponseEntity = serviceFacadeSpy.copyComponents(groupId, copyRequestEntity); + + assertNotNull(copyResponseEntity); + assertEquals(1, copyResponseEntity.getProcessors().size()); + assertEquals(processorOneId, copyResponseEntity.getProcessors().iterator().next().getInstanceIdentifier()); + assertTrue(copyResponseEntity.getExternalControllerServiceReferences().isEmpty()); + assertTrue(copyResponseEntity.getParameterContexts().isEmpty()); + assertTrue(copyResponseEntity.getParameterProviders().isEmpty()); + + final CopyRequestEntity copyRequestEntityTwo = new CopyRequestEntity(); + copyRequestEntityTwo.setProcessors(Set.of(processorTwoId)); + copyResponseEntity = serviceFacadeSpy.copyComponents(groupId, copyRequestEntityTwo); + + assertNotNull(copyResponseEntity); + assertEquals(1, copyResponseEntity.getProcessors().size()); + assertEquals(processorTwoId, copyResponseEntity.getProcessors().iterator().next().getInstanceIdentifier()); + assertEquals(1, copyResponseEntity.getExternalControllerServiceReferences().size()); + assertTrue(copyResponseEntity.getParameterContexts().isEmpty()); + assertTrue(copyResponseEntity.getParameterProviders().isEmpty()); + } + + @Test + public void testPasteComponents() { + // set the user + final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(new Builder().identity(USER_1).build())); + SecurityContextHolder.getContext().setAuthentication(authentication); + + final String groupId = "groupId"; + final String seed = "seed"; + + final String sensitiveValue = "sensitiveValue"; + final String sensitiveProperty = "sensitiveProperty"; + + final FlowManager flowManager = mock(FlowManager.class); + when(flowController.getFlowManager()).thenReturn(flowManager); + + final RevisionManager revisionManager = new NaiveRevisionManager(); + serviceFacade.setRevisionManager(revisionManager); + + final Map<String, String> properties = new HashMap<>(); + properties.put(sensitiveProperty, null); + + final String instanceId = "67890"; + final VersionedProcessor versionedProcessor = new VersionedProcessor(); + versionedProcessor.setIdentifier("12345"); + versionedProcessor.setInstanceIdentifier(instanceId); + versionedProcessor.setProperties(properties); + + final PropertyDescriptor propertyDescriptor = mock(PropertyDescriptor.class); + when(propertyDescriptor.getName()).thenReturn(sensitiveProperty); + when(propertyDescriptor.isSensitive()).thenReturn(true); + + final Map<PropertyDescriptor, PropertyConfiguration> copiedInstanceProperties = new HashMap<>(); + copiedInstanceProperties.put(propertyDescriptor, null); + + final ProcessorNode copiedInstance = mock(ProcessorNode.class); + when(copiedInstance.getProperties()).thenReturn(copiedInstanceProperties); + when(copiedInstance.getRawPropertyValue(propertyDescriptor)).thenReturn(sensitiveValue); + when(flowManager.getProcessorNode(eq(instanceId))).thenReturn(copiedInstance); + + final VersionedComponentAdditions additions = new VersionedComponentAdditions.Builder() + .setProcessors(Set.of(versionedProcessor)) + .build(); + + when(processGroupDAO.addVersionedComponents(groupId, additions, seed)).thenReturn(new ComponentAdditions.Builder().build()); + + final ArgumentCaptor<VersionedComponentAdditions> additionsCaptor = ArgumentCaptor.forClass(VersionedComponentAdditions.class); + + serviceFacade.pasteComponents(new Revision(0l, "", groupId), groupId, additions, seed); + + verify(processGroupDAO).addVersionedComponents(eq(groupId), additionsCaptor.capture(), eq(seed)); + final VersionedComponentAdditions capturedAdditions = additionsCaptor.getValue(); + + // verify the sensitive value was mapped to the local instance + assertEquals(sensitiveValue, capturedAdditions.getProcessors().iterator().next().getProperties().get(propertyDescriptor.getName())); + } + @Test public void testGetCurrentFlowSnapshotByGroupId() { final String groupId = UUID.randomUUID().toString();
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java+12 −18 modified@@ -57,7 +57,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.dto.VerifyConfigRequestDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.VersionedFlowDTO; @@ -75,14 +74,14 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; -import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowAnalysisRuleEntity; import org.apache.nifi.web.api.entity.FlowAnalysisRuleRunStatusEntity; import org.apache.nifi.web.api.entity.FlowAnalysisRulesEntity; import org.apache.nifi.web.api.entity.FlowComparisonEntity; -import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; @@ -99,6 +98,8 @@ import org.apache.nifi.web.api.entity.ParameterProviderParameterApplicationEntity; import org.apache.nifi.web.api.entity.ParameterProviderParameterFetchEntity; import org.apache.nifi.web.api.entity.ParameterProvidersEntity; +import org.apache.nifi.web.api.entity.PasteRequestEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; @@ -110,7 +111,6 @@ import org.apache.nifi.web.api.entity.ReportingTaskRunStatusEntity; import org.apache.nifi.web.api.entity.ReportingTasksEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; -import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VerifyConfigRequestEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; @@ -2173,22 +2173,16 @@ public String getVersionedFlowState(final String groupId, final String parentGro .orElse(null); } - public FlowEntity copyAndPaste(final ProcessGroupEntity pgEntity, final String destinationGroupId) throws NiFiClientException, IOException { - final SnippetDTO snippetDto = new SnippetDTO(); - snippetDto.setProcessGroups(Collections.singletonMap(pgEntity.getId(), pgEntity.getRevision())); - snippetDto.setParentGroupId(pgEntity.getComponent().getParentGroupId()); + public FlowDTO copyAndPaste(final String sourceGroupId, final CopyRequestEntity copyRequestEntity, final RevisionDTO revisionDTO, + final String destinationGroupId) throws NiFiClientException, IOException { + final CopyResponseEntity copyResponseEntity = nifiClient.getProcessGroupClient().copy(sourceGroupId, copyRequestEntity); - final SnippetEntity snippetEntity = new SnippetEntity(); - snippetEntity.setSnippet(snippetDto); + final PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(revisionDTO); - final SnippetEntity createdSnippetEntity = nifiClient.getSnippetClient().createSnippet(snippetEntity); - - final CopySnippetRequestEntity requestEntity = new CopySnippetRequestEntity(); - requestEntity.setOriginX(0D); - requestEntity.setOriginY(0D); - requestEntity.setSnippetId(createdSnippetEntity.getSnippet().getId()); - - return nifiClient.getProcessGroupClient().copySnippet(destinationGroupId, requestEntity); + final PasteResponseEntity pasteResponseEntity = nifiClient.getProcessGroupClient().paste(destinationGroupId, pasteRequestEntity); + return pasteResponseEntity.getFlow(); } public ConnectionEntity setFifoPrioritizer(final ConnectionEntity connectionEntity) throws NiFiClientException, IOException {
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/CopyPasteIT.java+405 −0 added@@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.pg; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; +import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; +import org.apache.nifi.web.api.entity.ParameterContextEntity; +import org.apache.nifi.web.api.entity.PasteRequestEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.nifi.tests.system.registry.RegistryClientIT.FIRST_FLOW_ID; +import static org.apache.nifi.tests.system.registry.RegistryClientIT.TEST_FLOWS_BUCKET; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CopyPasteIT extends NiFiSystemIT { + + @Test + public void testSimpleCopyPaste() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child group", topLevel.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId()); + final ProcessorEntity count = getClientUtil().createProcessor("CountEvents", childGroup.getId()); + getClientUtil().setAutoTerminatedRelationships(count, "success"); + + getClientUtil().createConnection(generate, count, "success"); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessGroups(Set.of(childGroup.getId())); + + final FlowDTO flowDto = getClientUtil().copyAndPaste(topLevel.getId(), copyRequestEntity, topLevel.getRevision(), topLevel.getId()); + final ProcessGroupEntity pastedProcessGroup = flowDto.getProcessGroups().iterator().next(); + + assertNotNull(pastedProcessGroup); + assertEquals("child group", pastedProcessGroup.getComponent().getName()); + + final ProcessGroupFlowEntity pastedGroupFlowEntity = getNifiClient().getFlowClient().getProcessGroup(pastedProcessGroup.getId()); + final FlowDTO childFlowDto = pastedGroupFlowEntity.getProcessGroupFlow().getFlow(); + assertEquals(2, childFlowDto.getProcessors().size()); + assertEquals(1, childFlowDto.getConnections().size()); + } + + @Test + public void testPortNameUniquenessCopyPaste() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final PortEntity in = getClientUtil().createInputPort("in", topLevel.getId()); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setInputPorts(Set.of(in.getId())); + + // paste into the current group where the port must be renamed to ensure uniqueness + final FlowDTO renamePortFlowDto = getClientUtil().copyAndPaste(topLevel.getId(), copyRequestEntity, topLevel.getRevision(), topLevel.getId()); + final PortEntity renamedPastedPort = renamePortFlowDto.getInputPorts().iterator().next(); + + assertNotNull(renamedPastedPort); + assertTrue(Pattern.matches("in \\([a-f0-9\\-]{36}\\)", renamedPastedPort.getComponent().getName())); + + final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child group", topLevel.getId()); + + // paste into a child group where the port name will not conflict, and it's proposed name will not change + final FlowDTO notRenamePortFlowDto = getClientUtil().copyAndPaste(topLevel.getId(), copyRequestEntity, childGroup.getRevision(), childGroup.getId()); + final PortEntity notRenamedPastedPort = notRenamePortFlowDto.getInputPorts().iterator().next(); + + assertNotNull(notRenamedPastedPort); + assertEquals("in", notRenamedPastedPort.getComponent().getName()); + } + + @Test + public void testPastedPortInRootGroupMustBePublic() throws NiFiClientException, IOException { + final ProcessGroupEntity root = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", root.getId()); + final PortEntity in = getClientUtil().createInputPort("in", topLevel.getId()); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setInputPorts(Set.of(in.getId())); + + final CopyResponseEntity copyResponseEntity = getNifiClient().getProcessGroupClient().copy(topLevel.getId(), copyRequestEntity); + assertEquals(1, copyResponseEntity.getInputPorts().size()); + assertFalse(copyResponseEntity.getInputPorts().iterator().next().getAllowRemoteAccess()); + + final PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(root.getRevision()); + + final PasteResponseEntity pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(root.getId(), pasteRequestEntity); + final FlowDTO flowDto = pasteResponseEntity.getFlow(); + assertEquals(1, flowDto.getInputPorts().size()); + assertTrue(flowDto.getInputPorts().iterator().next().getComponent().getAllowRemoteAccess()); + } + + @Test + public void testPastedPortInChildGroupHonorsCopiedPayload() throws NiFiClientException, IOException { + final ProcessGroupEntity root = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", root.getId()); + final PortEntity in = getClientUtil().createInputPort("in", topLevel.getId()); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setInputPorts(Set.of(in.getId())); + + // not public + final CopyResponseEntity copyResponseEntity = getNifiClient().getProcessGroupClient().copy(topLevel.getId(), copyRequestEntity); + assertEquals(1, copyResponseEntity.getInputPorts().size()); + assertFalse(copyResponseEntity.getInputPorts().iterator().next().getAllowRemoteAccess()); + + PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(topLevel.getRevision()); + + PasteResponseEntity pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(topLevel.getId(), pasteRequestEntity); + FlowDTO flowDto = pasteResponseEntity.getFlow(); + assertEquals(1, flowDto.getInputPorts().size()); + assertNull(flowDto.getInputPorts().iterator().next().getComponent().getAllowRemoteAccess()); + + // public + copyResponseEntity.getInputPorts().iterator().next().setAllowRemoteAccess(true); + + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(topLevel.getRevision()); + + pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(topLevel.getId(), pasteRequestEntity); + flowDto = pasteResponseEntity.getFlow(); + assertEquals(1, flowDto.getInputPorts().size()); + assertTrue(flowDto.getInputPorts().iterator().next().getComponent().getAllowRemoteAccess()); + } + + @Test + public void testGhostComponent() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile", topLevel.getId()); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessors(Set.of(generate.getId())); + + final CopyResponseEntity copyResponseEntity = getNifiClient().getProcessGroupClient().copy(topLevel.getId(), copyRequestEntity); + assertEquals(1, copyResponseEntity.getProcessors().size()); + + // update the type to something unknown + copyResponseEntity.getProcessors().iterator().next().setType("NotARealExtensionType"); + + final PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(topLevel.getRevision()); + + final PasteResponseEntity pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(topLevel.getId(), pasteRequestEntity); + final FlowDTO flowDto = pasteResponseEntity.getFlow(); + assertEquals(1, flowDto.getProcessors().size()); + assertTrue(flowDto.getProcessors().iterator().next().getComponent().getExtensionMissing()); + } + + @Test + public void testExternalControllerService() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ControllerServiceEntity service = getClientUtil().createControllerService("StandardCountService", topLevel.getId()); + final ProcessorEntity count = getClientUtil().createProcessor("CountFlowFiles", topLevel.getId()); + + // reference the controller service + getClientUtil().updateProcessorProperties(count, Map.of("Count Service", service.getId())); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessors(Set.of(count.getId())); + + final FlowDTO flowDto = getClientUtil().copyAndPaste(topLevel.getId(), copyRequestEntity, topLevel.getRevision(), topLevel.getId()); + assertEquals(1, flowDto.getProcessors().size()); + assertEquals(service.getId(), flowDto.getProcessors().iterator().next().getComponent().getConfig().getProperties().get("Count Service")); + } + + @Test + public void testSensitiveValueCopiedFromInstance() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents", topLevel.getId()); + + // set a sensitive property + getClientUtil().updateProcessorProperties(countEvents, Map.of("Sensitive", "sensitive value")); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessors(Set.of(countEvents.getId())); + + final CopyResponseEntity copyResponseEntity = getNifiClient().getProcessGroupClient().copy(topLevel.getId(), copyRequestEntity); + assertEquals(1, copyResponseEntity.getProcessors().size()); + assertNull(copyResponseEntity.getProcessors().iterator().next().getProperties().get("Sensitive")); + + final PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(topLevel.getRevision()); + + final PasteResponseEntity pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(topLevel.getId(), pasteRequestEntity); + final FlowDTO flowDto = pasteResponseEntity.getFlow(); + assertEquals(1, flowDto.getProcessors().size()); + assertEquals("********", flowDto.getProcessors().iterator().next().getComponent().getConfig().getProperties().get("Sensitive")); + } + + @Test + public void testSensitiveValueNotCopiedFromInstance() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents", topLevel.getId()); + + // set a sensitive property + getClientUtil().updateProcessorProperties(countEvents, Map.of("Sensitive", "sensitive value")); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessors(Set.of(countEvents.getId())); + + final CopyResponseEntity copyResponseEntity = getNifiClient().getProcessGroupClient().copy(topLevel.getId(), copyRequestEntity); + assertEquals(1, copyResponseEntity.getProcessors().size()); + assertNull(copyResponseEntity.getProcessors().iterator().next().getProperties().get("Sensitive")); + + // delete the copied component so the sensitive value isn't copied over + getClientUtil().deleteAll(topLevel.getId()); + + final PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(topLevel.getRevision()); + + final PasteResponseEntity pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(topLevel.getId(), pasteRequestEntity); + final FlowDTO flowDto = pasteResponseEntity.getFlow(); + assertEquals(1, flowDto.getProcessors().size()); + assertNull(flowDto.getProcessors().iterator().next().getComponent().getConfig().getProperties().get("Sensitive")); + } + + @Test + public void testParameterContextReferencedCopied() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child group", topLevel.getId()); + final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("my parameters", "param", "value", false); + getClientUtil().setParameterContext(childGroup.getId(), parameterContextEntity); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessGroups(Set.of(childGroup.getId())); + + final FlowDTO flowDto = getClientUtil().copyAndPaste(topLevel.getId(), copyRequestEntity, topLevel.getRevision(), topLevel.getId()); + assertEquals(1, flowDto.getProcessGroups().size()); + + final ProcessGroupEntity pastedProcessGroup = flowDto.getProcessGroups().iterator().next(); + assertNotNull(pastedProcessGroup.getParameterContext()); + assertEquals(parameterContextEntity.getId(), pastedProcessGroup.getParameterContext().getId()); + } + + @Test + public void testNewParameterContextCreated() throws NiFiClientException, IOException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("parent group", "root"); + final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child group", topLevel.getId()); + final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("my parameters", "param", "value", false); + getClientUtil().setParameterContext(childGroup.getId(), parameterContextEntity); + + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessGroups(Set.of(childGroup.getId())); + + final CopyResponseEntity copyResponseEntity = getNifiClient().getProcessGroupClient().copy(topLevel.getId(), copyRequestEntity); + assertEquals(1, copyResponseEntity.getProcessGroups().size()); + + // delete the parameter contexts before pasting + getClientUtil().deleteParameterContexts(); + + final PasteRequestEntity pasteRequestEntity = new PasteRequestEntity(); + pasteRequestEntity.setCopyResponse(copyResponseEntity); + pasteRequestEntity.setRevision(topLevel.getRevision()); + + final PasteResponseEntity pasteResponseEntity = getNifiClient().getProcessGroupClient().paste(topLevel.getId(), pasteRequestEntity); + final FlowDTO flowDto = pasteResponseEntity.getFlow(); + + assertEquals(1, flowDto.getProcessGroups().size()); + + final ProcessGroupEntity pastedProcessGroup = flowDto.getProcessGroups().iterator().next(); + assertNotNull(pastedProcessGroup.getParameterContext()); + assertNotEquals(parameterContextEntity.getId(), pastedProcessGroup.getParameterContext().getId()); + } + + @Test + public void testCopyPasteProcessGroupDoesNotDuplicateVersionedComponentId() throws NiFiClientException, IOException { + // Create a top-level PG and version it with nothing in it. + final FlowRegistryClientEntity clientEntity = registerClient(); + final ProcessGroupEntity outerGroup = getClientUtil().createProcessGroup("Outer", "root"); + getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); + + // Create a lower level PG and add a Processor. + // Commit as Version 2 of the group. + final ProcessGroupEntity inner1 = getClientUtil().createProcessGroup("Inner 1", outerGroup.getId()); + ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", inner1.getId()); + VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); + assertEquals("2", vciEntity.getVersionControlInformation().getVersion()); + + // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id + terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); + assertNotNull(terminate1.getComponent().getVersionedComponentId()); + + // Build the copy request + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessGroups(Set.of(inner1.getId())); + + // Copy and paste the inner Process Group + final FlowDTO flowDto = getClientUtil().copyAndPaste(outerGroup.getId(), copyRequestEntity, outerGroup.getRevision(), outerGroup.getId()); + final ProcessGroupEntity inner2Entity = flowDto.getProcessGroups().iterator().next(); + + final ProcessGroupFlowEntity inner2FlowEntity = getNifiClient().getFlowClient().getProcessGroup(inner2Entity.getId()); + final Set<ProcessorEntity> inner2FlowProcessors = inner2FlowEntity.getProcessGroupFlow().getFlow().getProcessors(); + assertEquals(1, inner2FlowProcessors.size()); + + ProcessorEntity terminate2 = inner2FlowProcessors.iterator().next(); + assertEquals(terminate1.getComponent().getName(), terminate2.getComponent().getName()); + assertEquals(terminate1.getComponent().getType(), terminate2.getComponent().getType()); + assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId()); + assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); + + // First Control again with the newly created components + vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); + assertEquals("3", vciEntity.getVersionControlInformation().getVersion()); + + // Get new version of terminate2 processor and terminate1 processor. Ensure that both have version control ID's but that they are different. + terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); + terminate2 = getNifiClient().getProcessorClient().getProcessor(terminate2.getId()); + + assertNotNull(terminate1.getComponent().getVersionedComponentId()); + assertNotNull(terminate2.getComponent().getVersionedComponentId()); + assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); + } + + @Test + public void testCopyPasteProcessGroupUnderVersionControlMaintainsVersionedComponentId() throws NiFiClientException, IOException, InterruptedException { + // Create a top-level PG and version it with nothing in it. + final FlowRegistryClientEntity clientEntity = registerClient(); + final ProcessGroupEntity topLevel1 = getClientUtil().createProcessGroup("Top Level 1", "root"); + + // Create a lower level PG and add a Processor. + // Commit as Version 2 of the group. + final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId()); + ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId()); + VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); + assertEquals("1", vciEntity.getVersionControlInformation().getVersion()); + + // Now that the inner group is under version control, copy it and paste it to a new PG. + // This should result in the pasted Process Group having a processor with the same Versioned Component ID, because the Processors + // have different Versioned groups, so they can have duplicate Versioned Component IDs. + final ProcessGroupEntity topLevel2 = getClientUtil().createProcessGroup("Top Level 2", "root"); + + // Build the request and copy and paste + final CopyRequestEntity copyRequestEntity = new CopyRequestEntity(); + copyRequestEntity.setProcessGroups(Set.of(innerGroup.getId())); + + final FlowDTO flowDto = getClientUtil().copyAndPaste(topLevel1.getId(), copyRequestEntity, topLevel2.getRevision(), topLevel2.getId()); + final String pastedGroupId = flowDto.getProcessGroups().iterator().next().getId(); + final ProcessGroupFlowEntity pastedGroupFlowEntity = getNifiClient().getFlowClient().getProcessGroup(pastedGroupId); + final ProcessorEntity terminate2 = pastedGroupFlowEntity.getProcessGroupFlow().getFlow().getProcessors().iterator().next(); + + // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id + terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); + assertNotNull(terminate1.getComponent().getVersionedComponentId()); + + // Both the pasted Process Group and the original should have the same Version Control Information. + final VersionControlInformationDTO originalGroupVci = getNifiClient().getProcessGroupClient().getProcessGroup(innerGroup.getId()).getComponent().getVersionControlInformation(); + final VersionControlInformationDTO pastedGroupVci = getNifiClient().getProcessGroupClient().getProcessGroup(pastedGroupId).getComponent().getVersionControlInformation(); + assertNotNull(originalGroupVci); + assertNotNull(pastedGroupVci); + assertEquals(originalGroupVci.getBucketId(), pastedGroupVci.getBucketId()); + assertEquals(originalGroupVci.getFlowId(), pastedGroupVci.getFlowId()); + assertEquals(originalGroupVci.getVersion(), pastedGroupVci.getVersion()); + + // Wait for the Version Control Information to show a state of UP_TO_DATE. We have to wait for this because it initially is set to SYNC_FAILURE and a background task + // is kicked off to determine the state. + waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getClientUtil().getVersionControlState(innerGroup.getId())) ); + waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getClientUtil().getVersionControlState(pastedGroupId)) ); + + // The two processors should have the same Versioned Component ID + assertEquals(terminate1.getComponent().getName(), terminate2.getComponent().getName()); + assertEquals(terminate1.getComponent().getType(), terminate2.getComponent().getType()); + assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId()); + assertEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); + } +}
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java+2 −98 modified@@ -25,16 +25,13 @@ import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.SnippetDTO; -import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; -import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; -import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; @@ -56,9 +53,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class RegistryClientIT extends NiFiSystemIT { - private static final String TEST_FLOWS_BUCKET = "test-flows"; + public static final String TEST_FLOWS_BUCKET = "test-flows"; - private static final String FIRST_FLOW_ID = "first-flow"; + public static final String FIRST_FLOW_ID = "first-flow"; /** * Test a scenario where we have Parent Process Group with a child process group. The child group is under Version Control. @@ -372,97 +369,4 @@ public void testStartVersionControlThenModifyAndRevert() throws NiFiClientExcept assertEquals("UP_TO_DATE", versionedFlowState); } - - @Test - public void testCopyPasteProcessGroupDoesNotDuplicateVersionedComponentId() throws NiFiClientException, IOException { - // Create a top-level PG and version it with nothing in it. - final FlowRegistryClientEntity clientEntity = registerClient(); - final ProcessGroupEntity outerGroup = getClientUtil().createProcessGroup("Outer", "root"); - getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); - - // Create a lower level PG and add a Processor. - // Commit as Version 2 of the group. - final ProcessGroupEntity inner1 = getClientUtil().createProcessGroup("Inner 1", outerGroup.getId()); - ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", inner1.getId()); - VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); - assertEquals("2", vciEntity.getVersionControlInformation().getVersion()); - - // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id - terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); - assertNotNull(terminate1.getComponent().getVersionedComponentId()); - - // Copy and paste the inner Process Group - final FlowEntity flowEntity = getClientUtil().copyAndPaste(inner1, outerGroup.getId()); - final ProcessGroupEntity inner2Entity = flowEntity.getFlow().getProcessGroups().iterator().next(); - - final ProcessGroupFlowEntity inner2FlowEntity = getNifiClient().getFlowClient().getProcessGroup(inner2Entity.getId()); - final Set<ProcessorEntity> inner2FlowProcessors = inner2FlowEntity.getProcessGroupFlow().getFlow().getProcessors(); - assertEquals(1, inner2FlowProcessors.size()); - - ProcessorEntity terminate2 = inner2FlowProcessors.iterator().next(); - assertEquals(terminate1.getComponent().getName(), terminate2.getComponent().getName()); - assertEquals(terminate1.getComponent().getType(), terminate2.getComponent().getType()); - assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId()); - assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); - - // First Control again with the newly created components - vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); - assertEquals("3", vciEntity.getVersionControlInformation().getVersion()); - - // Get new version of terminate2 processor and terminate1 processor. Ensure that both have version control ID's but that they are different. - terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); - terminate2 = getNifiClient().getProcessorClient().getProcessor(terminate2.getId()); - - assertNotNull(terminate1.getComponent().getVersionedComponentId()); - assertNotNull(terminate2.getComponent().getVersionedComponentId()); - assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); - } - - @Test - public void testCopyPasteProcessGroupUnderVersionControlMaintainsVersionedComponentId() throws NiFiClientException, IOException, InterruptedException { - // Create a top-level PG and version it with nothing in it. - final FlowRegistryClientEntity clientEntity = registerClient(); - final ProcessGroupEntity topLevel1 = getClientUtil().createProcessGroup("Top Level 1", "root"); - - // Create a lower level PG and add a Processor. - // Commit as Version 2 of the group. - final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId()); - ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId()); - VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID); - assertEquals("1", vciEntity.getVersionControlInformation().getVersion()); - - // Now that the inner group is under version control, copy it and paste it to a new PG. - // This should result in the pasted Process Group having a processor with the same Versioned Component ID, because the Processors - // have different Versioned groups, so they can have duplicate Versioned Component IDs. - final ProcessGroupEntity topLevel2 = getClientUtil().createProcessGroup("Top Level 2", "root"); - final FlowEntity flowEntity = getClientUtil().copyAndPaste(innerGroup, topLevel2.getId()); - final String pastedGroupId = flowEntity.getFlow().getProcessGroups().iterator().next().getId(); - final ProcessGroupFlowEntity pastedGroupFlowEntity = getNifiClient().getFlowClient().getProcessGroup(pastedGroupId); - final ProcessorEntity terminate2 = pastedGroupFlowEntity.getProcessGroupFlow().getFlow().getProcessors().iterator().next(); - - // Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id - terminate1 = getNifiClient().getProcessorClient().getProcessor(terminate1.getId()); - assertNotNull(terminate1.getComponent().getVersionedComponentId()); - - // Both the pasted Process Group and the original should have the same Version Control Information. - final VersionControlInformationDTO originalGroupVci = getNifiClient().getProcessGroupClient().getProcessGroup(innerGroup.getId()).getComponent().getVersionControlInformation(); - final VersionControlInformationDTO pastedGroupVci = getNifiClient().getProcessGroupClient().getProcessGroup(pastedGroupId).getComponent().getVersionControlInformation(); - assertNotNull(originalGroupVci); - assertNotNull(pastedGroupVci); - assertEquals(originalGroupVci.getBucketId(), pastedGroupVci.getBucketId()); - assertEquals(originalGroupVci.getFlowId(), pastedGroupVci.getFlowId()); - assertEquals(originalGroupVci.getVersion(), pastedGroupVci.getVersion()); - - // Wait for the Version Control Information to show a state of UP_TO_DATE. We have to wait for this because it initially is set to SYNC_FAILURE and a background task - // is kicked off to determine the state. - waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getClientUtil().getVersionControlState(innerGroup.getId())) ); - waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getClientUtil().getVersionControlState(pastedGroupId)) ); - - // The two processors should have the same Versioned Component ID - assertEquals(terminate1.getComponent().getName(), terminate2.getComponent().getName()); - assertEquals(terminate1.getComponent().getType(), terminate2.getComponent().getType()); - assertNotEquals(terminate1.getComponent().getId(), terminate2.getComponent().getId()); - assertEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId()); - } - }
nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java+45 −0 modified@@ -25,10 +25,14 @@ import org.apache.nifi.toolkit.client.ProcessGroupClient; import org.apache.nifi.toolkit.client.RequestConfig; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowEntity; +import org.apache.nifi.web.api.entity.PasteRequestEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; @@ -333,4 +337,45 @@ private File getFileContent(final Response response, final File outputFile) { } } + @Override + public CopyResponseEntity copy(String processGroupId, CopyRequestEntity copyRequestEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (copyRequestEntity == null) { + throw new IllegalArgumentException("Copy Request Entity cannot be null"); + } + + return executeAction("Error copying", () -> { + final WebTarget target = processGroupsTarget + .path("{id}/copy") + .resolveTemplate("id", processGroupId); + + return getRequestBuilder(target).post( + Entity.entity(copyRequestEntity, MediaType.APPLICATION_JSON_TYPE), + CopyResponseEntity.class); + }); + } + + @Override + public PasteResponseEntity paste(String processGroupId, PasteRequestEntity pasteRequestEntity) throws NiFiClientException, IOException { + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (pasteRequestEntity == null) { + throw new IllegalArgumentException("Paste Request Entity cannot be null"); + } + + return executeAction("Error pasting", () -> { + final WebTarget target = processGroupsTarget + .path("{id}/paste") + .resolveTemplate("id", processGroupId); + + return getRequestBuilder(target).put( + Entity.entity(pasteRequestEntity, MediaType.APPLICATION_JSON_TYPE), + PasteResponseEntity.class); + }); + } } \ No newline at end of file
nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java+8 −0 modified@@ -17,10 +17,14 @@ package org.apache.nifi.toolkit.client; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.CopyRequestEntity; +import org.apache.nifi.web.api.entity.CopyResponseEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowEntity; +import org.apache.nifi.web.api.entity.PasteRequestEntity; +import org.apache.nifi.web.api.entity.PasteResponseEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; @@ -67,4 +71,8 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException; DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId) throws NiFiClientException, IOException; + + CopyResponseEntity copy(String processGroupId, CopyRequestEntity copyRequestEntity) throws NiFiClientException, IOException; + + PasteResponseEntity paste(String processGroupId, PasteRequestEntity pasteRequestEntity) throws NiFiClientException, IOException; }
Vulnerability mechanics
Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
5- github.com/advisories/GHSA-mpj7-7mg7-x95jghsaADVISORY
- lists.apache.org/thread/cjc8fns5kjsho0s7vonlnojokyfx47wnghsavendor-advisoryWEB
- nvd.nist.gov/vuln/detail/CVE-2024-56512ghsaADVISORY
- www.openwall.com/lists/oss-security/2024/12/28/1ghsaWEB
- github.com/apache/nifi/commit/f744deebf9a9effdbbff79ce6073ec329b5f45daghsaWEB
News mentions
0No linked articles in our index yet.