Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ dependencies {
implementation libs.lucene.queryparser
implementation libs.lucene.analysis.common

implementation libs.fastcsv

implementation libs.opendatadiscovery.oddrn
implementation(libs.opendatadiscovery.client) {
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public class ClustersProperties {

AdminClient adminClient = new AdminClient();

Csv csv = new Csv();

@Data
public static class Csv {
String lineDelimeter = "crlf";
char quoteCharacter = '"';
String quoteStrategy = "required";
char fieldSeparator = ',';
}

@Data
public static class AdminClient {
Integer timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.service.ClustersStorage;
import io.kafbat.ui.service.CsvWriterService;
import io.kafbat.ui.service.audit.AuditService;
import io.kafbat.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;

Expand All @@ -15,6 +22,7 @@ public abstract class AbstractController {
protected ClustersStorage clustersStorage;
protected AccessControlService accessControlService;
protected AuditService auditService;
protected CsvWriterService csvWriterService;

protected KafkaCluster getCluster(String name) {
return clustersStorage.getClusterByName(name)
Expand Down Expand Up @@ -44,4 +52,22 @@ public void setAccessControlService(AccessControlService accessControlService) {
public void setAuditService(AuditService auditService) {
this.auditService = auditService;
}

public <T extends Flux<R>, R> Mono<ResponseEntity<String>> responseToCsv(ResponseEntity<T> response) {
return responseToCsv(response, (t) -> t);
}

public <T, R> Mono<ResponseEntity<String>> responseToCsv(ResponseEntity<T> response, Function<T, Flux<R>> extract) {
if (response.getStatusCode().is2xxSuccessful()) {
return mapToCsv(extract.apply(response.getBody())).map(ResponseEntity::ok);
} else {
return Mono.just(ResponseEntity.status(response.getStatusCode()).body(
Optional.ofNullable(response.getBody()).map(Object::toString).orElse("")
));
}
}

protected <T> Mono<String> mapToCsv(Flux<T> body) {
return csvWriterService.write(body);
}
}
23 changes: 10 additions & 13 deletions api/src/main/java/io/kafbat/ui/controller/AclsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO
.thenReturn(ResponseEntity.ok().build());
}



@Override
public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
KafkaAclResourceTypeDTO resourceTypeDto,
Expand Down Expand Up @@ -96,19 +98,14 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
}

@Override
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("getAclAsCsv")
.build();

return validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> audit(context, sig))
);
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName,
KafkaAclResourceTypeDTO resourceType,
String resourceName,
KafkaAclNamePatternTypeDTO namePatternType,
String search, Boolean fts,
ServerWebExchange exchange) {
return listAcls(clusterName, resourceType, resourceName, namePatternType, search, fts, exchange)
.flatMap(this::responseToCsv);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<String>> getBrokersCsv(String clusterName,
ServerWebExchange exchange) {
return getBrokers(clusterName, exchange).flatMap(this::responseToCsv);
}

@Override
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
ServerWebExchange exchange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -95,6 +96,8 @@ public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clu
.doOnEach(sig -> audit(context, sig));
}



@Override
public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String clusterName,
String topicName,
Expand All @@ -120,6 +123,8 @@ public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(Strin
.doOnEach(sig -> audit(context, sig));
}



@Override
public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage(
String clusterName,
Expand All @@ -138,17 +143,51 @@ public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage
.build();

return validateAccess(context).then(
consumerGroupService.getConsumerGroupsPage(
consumerGroupService.getConsumerGroups(
getCluster(clusterName),
OptionalInt.of(
Optional.ofNullable(page).filter(i -> i > 0).orElse(1)
),
OptionalInt.of(
Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize)
),
search,
fts,
Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME),
Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC)
)
.map(this::convertPage)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
}


@Override
public Mono<ResponseEntity<String>> getConsumerGroupsCsv(String clusterName, Integer page,
Integer perPage, String search,
ConsumerGroupOrderingDTO orderBy,
SortOrderDTO sortOrderDto, Boolean fts,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
// consumer group access validation is within the service
.operationName("getConsumerGroupsPage")
.build();

return validateAccess(context).then(
consumerGroupService.getConsumerGroups(
getCluster(clusterName),
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize),
OptionalInt.empty(),
OptionalInt.empty(),
search,
fts,
Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME),
Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC)
)
.map(this::convertPage)
.map(ResponseEntity::ok)
.flatMap(r -> responseToCsv(r, (g) -> Flux.fromIterable(g.getConsumerGroups())))
).doOnEach(sig -> audit(context, sig));
}

Expand Down Expand Up @@ -194,7 +233,12 @@ public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName,
);
}
Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream()
.collect(toMap(PartitionOffsetDTO::getPartition, PartitionOffsetDTO::getOffset));
.collect(
toMap(
PartitionOffsetDTO::getPartition,
d -> Optional.ofNullable(d.getOffset()).orElse(0L)
)
);
return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
default:
return Mono.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.SortOrderDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TopicsResponseDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.service.KafkaConnectService;
Expand Down Expand Up @@ -56,6 +57,13 @@ public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
return Mono.just(ResponseEntity.ok(availableConnects));
}

@Override
public Mono<ResponseEntity<String>> getConnectsCsv(String clusterName, Boolean withStats,
ServerWebExchange exchange) {
return getConnects(clusterName, withStats, exchange)
.flatMap(this::responseToCsv);
}

@Override
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
ServerWebExchange exchange) {
Expand Down Expand Up @@ -157,6 +165,15 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<String>> getAllConnectorsCsv(String clusterName, String search,
ConnectorColumnsToSortDTO orderBy,
SortOrderDTO sortOrder, Boolean fts,
ServerWebExchange exchange) {
return getAllConnectors(clusterName, search, orderBy, sortOrder, fts, exchange)
.flatMap(this::responseToCsv);
}

@Override
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
String connectName,
Expand Down
25 changes: 23 additions & 2 deletions api/src/main/java/io/kafbat/ui/controller/TopicsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.kafbat.ui.model.TopicUpdateDTO;
import io.kafbat.ui.model.TopicsResponseDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.service.KafkaConnectService;
import io.kafbat.ui.service.TopicsService;
import io.kafbat.ui.service.analyze.TopicAnalysisService;
Expand Down Expand Up @@ -186,7 +185,7 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
.operationName("getTopics")
.build();

return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal, fts)
return topicsService.getTopics(getCluster(clusterName), search, showInternal, fts)
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.flatMap(topics -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
Expand Down Expand Up @@ -219,6 +218,28 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<String>> getTopicsCsv(String clusterName, Boolean showInternal,
String search, TopicColumnsToSortDTO orderBy,
SortOrderDTO sortOrder, Boolean fts,
ServerWebExchange exchange) {

AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getTopicsCsv")
.build();

ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, ftsProperties.use(fts));

return topicsService
.getTopics(getCluster(clusterName), search, showInternal, fts)
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.map(topics -> topics.stream().sorted(comparatorForTopic).toList())
.flatMap(topics -> responseToCsv(ResponseEntity.ok(Flux.fromIterable(topics))))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<TopicDTO>> updateTopic(
String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
Expand Down
Loading
Loading