7575import io .confluent .connect .storage .partitioner .HourlyPartitioner ;
7676import io .confluent .connect .storage .partitioner .PartitionerConfig ;
7777import io .confluent .connect .storage .partitioner .TimeBasedPartitioner ;
78+ import org .slf4j .Logger ;
79+ import org .slf4j .LoggerFactory ;
7880
7981import static org .apache .kafka .common .config .ConfigDef .Range .atLeast ;
8082
8183public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
8284
85+ private static final Logger log = LoggerFactory .getLogger (S3SinkConnectorConfig .class );
8386 // S3 Group
8487 public static final String S3_BUCKET_CONFIG = "s3.bucket.name" ;
8588
@@ -949,8 +952,8 @@ public String awsMiddlewareRoleARN() {
949952 return getString (MIDDLEWARE_ROLE_ARN_CONFIG );
950953 }
951954
952- public String awsExternalId () {
953- return getString (CUSTOMER_ROLE_EXTERNAL_ID_CONFIG );
955+ public Password awsExternalId () {
956+ return getPassword (CUSTOMER_ROLE_EXTERNAL_ID_CONFIG );
954957 }
955958
956959 public int getPartSize () {
@@ -966,32 +969,37 @@ public AWSCredentialsProvider getCredentialsProvider() {
966969 try {
967970 AWSCredentialsProvider provider = ((Class <? extends AWSCredentialsProvider >)
968971 getClass (S3SinkConnectorConfig .CREDENTIALS_PROVIDER_CLASS_CONFIG )).newInstance ();
972+
973+ String authMethod = getAuthenticationMethod ();
974+ log .info ("Authentication method: {}" , authMethod );
969975
970976 if (provider instanceof Configurable ) {
971977 Map <String , Object > configs = originalsWithPrefix (CREDENTIALS_PROVIDER_CONFIG_PREFIX );
972978 configs .remove (CREDENTIALS_PROVIDER_CLASS_CONFIG .substring (
973979 CREDENTIALS_PROVIDER_CONFIG_PREFIX .length ()
974980 ));
975981
976- String authMethod = getAuthenticationMethod ();
977- if (authMethod == "IAM Assume Role" ) {
982+ configs .put (AWS_ACCESS_KEY_ID_CONFIG , awsAccessKeyId ());
983+ configs .put (AWS_SECRET_ACCESS_KEY_CONFIG , awsSecretKeyId ().value ());
984+ ((Configurable ) provider ).configure (configs );
985+ } else {
986+ authMethod = getAuthenticationMethod ();
987+
988+ if (authMethod .equals ("IAM Assume Role" )) {
989+ Map <String , Object > configs = new HashMap <String , Object >();
978990 configs .put (CUSTOMER_ROLE_ARN_CONFIG , awsCustomerRoleARN ());
979- configs .put (CUSTOMER_ROLE_EXTERNAL_ID_CONFIG , awsExternalId ());
991+ configs .put (CUSTOMER_ROLE_EXTERNAL_ID_CONFIG , awsExternalId (). value () );
980992 configs .put (MIDDLEWARE_ROLE_ARN_CONFIG , awsMiddlewareRoleARN ());
981993
982994 provider = new AwsIamAssumeRoleChaining ();
983995 ((AwsIamAssumeRoleChaining ) provider ).configure (configs );
984996 } else {
985- configs .put (AWS_ACCESS_KEY_ID_CONFIG , awsAccessKeyId ());
986- configs .put (AWS_SECRET_ACCESS_KEY_CONFIG , awsSecretKeyId ().value ());
987- ((Configurable ) provider ).configure (configs );
988- }
989- } else {
990- final String accessKeyId = awsAccessKeyId ();
991- final String secretKey = awsSecretKeyId ().value ();
992- if (StringUtils .isNotBlank (accessKeyId ) && StringUtils .isNotBlank (secretKey )) {
993- BasicAWSCredentials basicCredentials = new BasicAWSCredentials (accessKeyId , secretKey );
994- provider = new AWSStaticCredentialsProvider (basicCredentials );
997+ final String accessKeyId = awsAccessKeyId ();
998+ final String secretKey = awsSecretKeyId ().value ();
999+ if (StringUtils .isNotBlank (accessKeyId ) && StringUtils .isNotBlank (secretKey )) {
1000+ BasicAWSCredentials basicCredentials = new BasicAWSCredentials (accessKeyId , secretKey );
1001+ provider = new AWSStaticCredentialsProvider (basicCredentials );
1002+ }
9951003 }
9961004 }
9971005
0 commit comments