From 6915166b1965624608e52ae44ed043a40f939352 Mon Sep 17 00:00:00 2001 From: Elia Bracci Date: Mon, 13 Apr 2026 15:38:59 +0200 Subject: [PATCH] PEEL-1526 Update dependencies to AWS SDK v2 and refactor SQS integration - Upgraded Go version to 1.24 in go.mod. - Replaced AWS SDK v1 with AWS SDK v2 for SQS operations, updating function signatures and types accordingly. - Simplified environment variable checks in consumer and producer examples, removing unnecessary AWS_PROFILE and AWS_SHARED_CREDENTIALS_FILE requirements. - Enhanced error handling and context usage in SQS methods. - Updated tests to reflect changes in the SQS client interface and ensure compatibility with the new SDK. --- go.mod | 22 +++++-- go.sum | 30 ++++++++++ internal/examples/sqs/consumer/consumer.go | 5 +- internal/examples/sqs/producer/producer.go | 5 +- sqs/mocks/sqsclient.go | 55 +++++++++++------ sqs/sqs.go | 55 ++++++----------- sqs/sqs_ack.go | 5 +- sqs/sqs_consume.go | 61 ++++++++++--------- sqs/sqs_enqueue.go | 32 +++++----- sqs/sqs_ping.go | 12 ++-- sqs/sqs_test.go | 70 +++++++++++----------- 11 files changed, 203 insertions(+), 149 deletions(-) diff --git a/go.mod b/go.mod index 01ac2ef..1a6bd27 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,25 @@ module github.com/simodima/squeue -go 1.18 +go 1.24 -require ( - github.com/aws/aws-sdk-go v1.55.5 - github.com/golang/mock v1.6.0 -) +require github.com/golang/mock v1.6.0 require ( + github.com/aws/aws-sdk-go-v2 v1.41.5 // indirect + github.com/aws/aws-sdk-go-v2/config v1.32.14 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.19.14 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.0.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sqs v1.42.25 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.15 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.19 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 // indirect + github.com/aws/smithy-go v1.24.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 41c6b40..60ff2d0 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,36 @@ github.com/aws/aws-sdk-go v1.44.28 h1:h/OAqEqY18wq//v6h4GNPMmCkxuzSDrWuGyrvSiRqf github.com/aws/aws-sdk-go v1.44.28/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY= +github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= +github.com/aws/aws-sdk-go-v2/config v1.32.14 h1:opVIRo/ZbbI8OIqSOKmpFaY7IwfFUOCCXBsUpJOwDdI= +github.com/aws/aws-sdk-go-v2/config v1.32.14/go.mod h1:U4/V0uKxh0Tl5sxmCBZ3AecYny4UNlVmObYjKuuaiOo= +github.com/aws/aws-sdk-go-v2/credentials v1.19.14 h1:n+UcGWAIZHkXzYt87uMFBv/l8THYELoX6gVcUvgl6fI= +github.com/aws/aws-sdk-go-v2/credentials v1.19.14/go.mod h1:cJKuyWB59Mqi0jM3nFYQRmnHVQIcgoxjEMAbLkpr62w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.21 h1:NUS3K4BTDArQqNu2ih7yeDLaS3bmHD0YndtA6UP884g= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.21/go.mod h1:YWNWJQNjKigKY1RHVJCuupeWDrrHjRqHm0N9rdrWzYI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 h1:Rgg6wvjjtX8bNHcvi9OnXWwcE0a2vGpbwmtICOsvcf4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21/go.mod h1:A/kJFst/nm//cyqonihbdpQZwiUhhzpqTsdbhDdRF9c= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgqSE5hE/o47Ij9qk/SEZFbUOe9A= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 h1:c31//R3xgIJMSC8S6hEVq+38DcvUlgFY0FM6mSI5oto= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21/go.mod h1:r6+pf23ouCB718FUxaqzZdbpYFyDtehyZcmP5KL9FkA= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.9 h1:QKZH0S178gCmFEgst8hN0mCX1KxLgHBKKY/CLqwP8lg= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.9/go.mod h1:7yuQJoT+OoH8aqIxw9vwF+8KpvLZ8AWmvmUWHsGQZvI= +github.com/aws/aws-sdk-go-v2/service/sqs v1.42.25 h1:8Bv3TQ1Cob6HLlpUbAnWxeHhAkYScJO9RIHh2WPXaxw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.42.25/go.mod h1:eDstEbM0OEnBUnNQxIA7j74Jy61cCU1S4EMlCtdMwzs= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.15 h1:lFd1+ZSEYJZYvv9d6kXzhkZu07si3f+GQ1AaYwa2LUM= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.15/go.mod h1:WSvS1NLr7JaPunCXqpJnWk1Bjo7IxzZXrZi1QQCkuqM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.19 h1:dzztQ1YmfPrxdrOiuZRMF6fuOwWlWpD2StNLTceKpys= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.19/go.mod h1:YO8TrYtFdl5w/4vmjL8zaBSsiNp3w0L1FfKVKenZT7w= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 h1:p8ogvvLugcR/zLBXTXrTkj0RYBUdErbMnAFFp12Lm/U= +github.com/aws/aws-sdk-go-v2/service/sts v1.41.10/go.mod h1:60dv0eZJfeVXfbT1tFJinbHrDfSJ2GZl4Q//OSSNAVw= +github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= +github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/examples/sqs/consumer/consumer.go b/internal/examples/sqs/consumer/consumer.go index f63bacf..a06ea30 100644 --- a/internal/examples/sqs/consumer/consumer.go +++ b/internal/examples/sqs/consumer/consumer.go @@ -34,12 +34,11 @@ func main() { log.Fatal("Error loading .env file") } - if !sqsexample.CheckEnvVariables("AWS_PROFILE", "AWS_SHARED_CREDENTIALS_FILE", "AWS_REGION", "AWS_QUEUE_URL") { + if !sqsexample.CheckEnvVariables("AWS_REGION", "AWS_QUEUE_URL") { log.Fatal(`Please set the env variables - AWS_PROFILE=user-dev-admin - AWS_SHARED_CREDENTIALS_FILE=/Users/{name.lastname}/.aws/credentials AWS_REGION=eu-central-1 AWS_QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/... + Credentials are resolved via the AWS SDK default chain (env vars, ~/.aws/credentials, Pod Identity, etc.) `) } diff --git a/internal/examples/sqs/producer/producer.go b/internal/examples/sqs/producer/producer.go index 06ebf73..80bd52f 100644 --- a/internal/examples/sqs/producer/producer.go +++ b/internal/examples/sqs/producer/producer.go @@ -19,12 +19,11 @@ func init() { log.Fatal("Error loading .env file") } - if !sqsexample.CheckEnvVariables("AWS_PROFILE", "AWS_SHARED_CREDENTIALS_FILE", "AWS_REGION", "AWS_QUEUE_URL") { + if !sqsexample.CheckEnvVariables("AWS_REGION", "AWS_QUEUE_URL") { log.Fatal(`Please set the env variables - AWS_PROFILE=user-dev-admin - AWS_SHARED_CREDENTIALS_FILE=/Users/{name.lastname}/.aws/credentials AWS_REGION=eu-central-1 AWS_QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/... + Credentials are resolved via the AWS SDK default chain (env vars, ~/.aws/credentials, Pod Identity, etc.) `) } } diff --git a/sqs/mocks/sqsclient.go b/sqs/mocks/sqsclient.go index 5915ab0..2ef1872 100644 --- a/sqs/mocks/sqsclient.go +++ b/sqs/mocks/sqsclient.go @@ -5,9 +5,10 @@ package mock_sqs import ( + context "context" reflect "reflect" - sqs "github.com/aws/aws-sdk-go/service/sqs" + sqs "github.com/aws/aws-sdk-go-v2/service/sqs" gomock "github.com/golang/mock/gomock" ) @@ -35,61 +36,81 @@ func (m *MocksqsClient) EXPECT() *MocksqsClientMockRecorder { } // DeleteMessage mocks base method. -func (m *MocksqsClient) DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) { +func (m *MocksqsClient) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteMessage", input) + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteMessage", varargs...) ret0, _ := ret[0].(*sqs.DeleteMessageOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // DeleteMessage indicates an expected call of DeleteMessage. -func (mr *MocksqsClientMockRecorder) DeleteMessage(input interface{}) *gomock.Call { +func (mr *MocksqsClientMockRecorder) DeleteMessage(ctx, params interface{}, optFns ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MocksqsClient)(nil).DeleteMessage), input) + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MocksqsClient)(nil).DeleteMessage), varargs...) } // GetQueueAttributes mocks base method. -func (m *MocksqsClient) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) { +func (m *MocksqsClient) GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetQueueAttributes", input) + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetQueueAttributes", varargs...) ret0, _ := ret[0].(*sqs.GetQueueAttributesOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // GetQueueAttributes indicates an expected call of GetQueueAttributes. -func (mr *MocksqsClientMockRecorder) GetQueueAttributes(input interface{}) *gomock.Call { +func (mr *MocksqsClientMockRecorder) GetQueueAttributes(ctx, params interface{}, optFns ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MocksqsClient)(nil).GetQueueAttributes), input) + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MocksqsClient)(nil).GetQueueAttributes), varargs...) } // ReceiveMessage mocks base method. -func (m *MocksqsClient) ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { +func (m *MocksqsClient) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReceiveMessage", input) + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReceiveMessage", varargs...) ret0, _ := ret[0].(*sqs.ReceiveMessageOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // ReceiveMessage indicates an expected call of ReceiveMessage. -func (mr *MocksqsClientMockRecorder) ReceiveMessage(input interface{}) *gomock.Call { +func (mr *MocksqsClientMockRecorder) ReceiveMessage(ctx, params interface{}, optFns ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MocksqsClient)(nil).ReceiveMessage), input) + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MocksqsClient)(nil).ReceiveMessage), varargs...) } // SendMessage mocks base method. -func (m *MocksqsClient) SendMessage(input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) { +func (m *MocksqsClient) SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMessage", input) + varargs := []interface{}{ctx, params} + for _, a := range optFns { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SendMessage", varargs...) ret0, _ := ret[0].(*sqs.SendMessageOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // SendMessage indicates an expected call of SendMessage. -func (mr *MocksqsClientMockRecorder) SendMessage(input interface{}) *gomock.Call { +func (mr *MocksqsClientMockRecorder) SendMessage(ctx, params interface{}, optFns ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MocksqsClient)(nil).SendMessage), input) + varargs := append([]interface{}{ctx, params}, optFns...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MocksqsClient)(nil).SendMessage), varargs...) } diff --git a/sqs/sqs.go b/sqs/sqs.go index de2afdd..699b882 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -1,23 +1,20 @@ package sqs import ( - "errors" + "context" "fmt" "net/url" - "os" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/config" + sqsv2 "github.com/aws/aws-sdk-go-v2/service/sqs" ) //go:generate mockgen -source=sqs.go -destination=mocks/sqsclient.go type sqsClient interface { - DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) - SendMessage(input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) - ReceiveMessage(input *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) - GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) + DeleteMessage(ctx context.Context, params *sqsv2.DeleteMessageInput, optFns ...func(*sqsv2.Options)) (*sqsv2.DeleteMessageOutput, error) + SendMessage(ctx context.Context, params *sqsv2.SendMessageInput, optFns ...func(*sqsv2.Options)) (*sqsv2.SendMessageOutput, error) + ReceiveMessage(ctx context.Context, params *sqsv2.ReceiveMessageInput, optFns ...func(*sqsv2.Options)) (*sqsv2.ReceiveMessageOutput, error) + GetQueueAttributes(ctx context.Context, params *sqsv2.GetQueueAttributesInput, optFns ...func(*sqsv2.Options)) (*sqsv2.GetQueueAttributesOutput, error) } type Driver struct { @@ -37,12 +34,7 @@ func New(options ...Option) (*Driver, error) { } if driver.sqsClient == nil { - clientCredentials, err := getCredentials() - if err != nil { - return nil, err - } - - client, err := createClient(driver.url, driver.region, clientCredentials) + client, err := createClient(driver.url, driver.region) if err != nil { return nil, err } @@ -59,31 +51,22 @@ func New(options ...Option) (*Driver, error) { return driver, nil } -func getCredentials() (*credentials.Credentials, error) { - if os.Getenv("AWS_SHARED_CREDENTIALS_FILE") != "" { - return credentials.NewSharedCredentials("", ""), nil - } else if os.Getenv("AWS_ACCESS_KEY_ID") != "" && os.Getenv("AWS_SECRET_ACCESS_KEY") != "" { - return credentials.NewEnvCredentials(), nil - } - - return nil, errors.New( - "missing AWS_SHARED_CREDENTIALS_FILE and AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env vars", - ) -} - -func createClient(queueUrl string, region string, clientCredentials *credentials.Credentials) (*sqs.SQS, error) { +func createClient(queueUrl string, region string) (*sqsv2.Client, error) { parsedUrl, err := url.ParseRequestURI(queueUrl) if err != nil { return nil, fmt.Errorf("error creating sqs client: %w", err) } - options := session.Options{ - Config: aws.Config{ - Endpoint: aws.String(fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Host)), - Region: aws.String(region), - Credentials: clientCredentials, - }, + endpoint := fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Host) + + cfg, err := config.LoadDefaultConfig( + context.Background(), + config.WithRegion(region), + config.WithBaseEndpoint(endpoint), + ) + if err != nil { + return nil, fmt.Errorf("error loading AWS config: %w", err) } - return sqs.New(session.Must(session.NewSessionWithOptions(options))), nil + return sqsv2.NewFromConfig(cfg), nil } diff --git a/sqs/sqs_ack.go b/sqs/sqs_ack.go index 4f74549..1d30178 100644 --- a/sqs/sqs_ack.go +++ b/sqs/sqs_ack.go @@ -1,9 +1,10 @@ package sqs import ( + "context" "errors" - "github.com/aws/aws-sdk-go/service/sqs" + sqsv2 "github.com/aws/aws-sdk-go-v2/service/sqs" ) func (c *Driver) Ack(queue, messageID string) error { @@ -11,7 +12,7 @@ func (c *Driver) Ack(queue, messageID string) error { return errors.New("invalid sqs driver") } - _, err := c.sqsClient.DeleteMessage(&sqs.DeleteMessageInput{ + _, err := c.sqsClient.DeleteMessage(context.Background(), &sqsv2.DeleteMessageInput{ QueueUrl: &queue, ReceiptHandle: &messageID, }) diff --git a/sqs/sqs_consume.go b/sqs/sqs_consume.go index 1d929f7..b9b3b50 100644 --- a/sqs/sqs_consume.go +++ b/sqs/sqs_consume.go @@ -1,63 +1,68 @@ package sqs import ( + "context" "fmt" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/aws" + sqsv2 "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/simodima/squeue/driver" ) -func safeDoOnReceiveMessage(do func(*sqs.ReceiveMessageInput)) func(m any) { +func safeDoOnReceiveMessage(do func(*sqsv2.ReceiveMessageInput)) func(m any) { return func(m any) { - if SQSMessage, ok := m.(*sqs.ReceiveMessageInput); ok { + if SQSMessage, ok := m.(*sqsv2.ReceiveMessageInput); ok { do(SQSMessage) } } } -func WithConsumeWaitTimeSeconds(wait int64) func(m any) { - return safeDoOnReceiveMessage(func(message *sqs.ReceiveMessageInput) { - message.WaitTimeSeconds = aws.Int64(wait) +func WithConsumeWaitTimeSeconds(wait int32) func(m any) { + return safeDoOnReceiveMessage(func(message *sqsv2.ReceiveMessageInput) { + message.WaitTimeSeconds = wait }) } -func WithConsumeVisibilityTimeout(timeout int64) func(m any) { - return safeDoOnReceiveMessage(func(message *sqs.ReceiveMessageInput) { - message.VisibilityTimeout = aws.Int64(timeout) +func WithConsumeVisibilityTimeout(timeout int32) func(m any) { + return safeDoOnReceiveMessage(func(message *sqsv2.ReceiveMessageInput) { + message.VisibilityTimeout = timeout }) } func WithConsumeRequestAttemptId(id string) func(m any) { - return safeDoOnReceiveMessage(func(message *sqs.ReceiveMessageInput) { + return safeDoOnReceiveMessage(func(message *sqsv2.ReceiveMessageInput) { message.ReceiveRequestAttemptId = aws.String(id) }) } func WithConsumeMessageSystemAttributeNames(attributes []string) func(m any) { - return safeDoOnReceiveMessage(func(message *sqs.ReceiveMessageInput) { + return safeDoOnReceiveMessage(func(message *sqsv2.ReceiveMessageInput) { if len(attributes) == 0 { - message.MessageSystemAttributeNames = aws.StringSlice( - []string{sqs.MessageSystemAttributeNameAll}, - ) + message.MessageSystemAttributeNames = []types.MessageSystemAttributeName{ + types.MessageSystemAttributeNameAll, + } } else { - message.MessageSystemAttributeNames = aws.StringSlice(attributes) + names := make([]types.MessageSystemAttributeName, len(attributes)) + for i, a := range attributes { + names[i] = types.MessageSystemAttributeName(a) + } + message.MessageSystemAttributeNames = names } - }) } func WithConsumeMessageAttributeNames(names []string) func(m any) { - return safeDoOnReceiveMessage(func(message *sqs.ReceiveMessageInput) { - message.MessageAttributeNames = aws.StringSlice(names) + return safeDoOnReceiveMessage(func(message *sqsv2.ReceiveMessageInput) { + message.MessageAttributeNames = names }) } func WithConsumeMaxNumberOfMessages(max int) func(m any) { - return safeDoOnReceiveMessage(func(message *sqs.ReceiveMessageInput) { - message.MaxNumberOfMessages = aws.Int64(int64(max)) + return safeDoOnReceiveMessage(func(message *sqsv2.ReceiveMessageInput) { + message.MaxNumberOfMessages = int32(max) }) } @@ -97,11 +102,11 @@ func (d *Driver) Consume(queue string, opts ...func(message any)) (*driver.Consu } func (d *Driver) fetchMessages(queue string, opts ...func(message any)) ([][2]string, error) { - req := &sqs.ReceiveMessageInput{ - VisibilityTimeout: aws.Int64(90), - MaxNumberOfMessages: aws.Int64(10), - MessageAttributeNames: []*string{ - aws.String(sqs.QueueAttributeNameAll), + req := &sqsv2.ReceiveMessageInput{ + VisibilityTimeout: 90, + MaxNumberOfMessages: 10, + MessageAttributeNames: []string{ + string(types.QueueAttributeNameAll), }, QueueUrl: &queue, } @@ -110,14 +115,12 @@ func (d *Driver) fetchMessages(queue string, opts ...func(message any)) ([][2]st o(req) } - msgResult, err := d.sqsClient.ReceiveMessage(req) - + msgResult, err := d.sqsClient.ReceiveMessage(context.Background(), req) if err != nil { return nil, err } messages := [][2]string{} - for _, m := range msgResult.Messages { messages = append(messages, [2]string{*m.Body, *m.ReceiptHandle}) } diff --git a/sqs/sqs_enqueue.go b/sqs/sqs_enqueue.go index bda887c..df868ef 100644 --- a/sqs/sqs_enqueue.go +++ b/sqs/sqs_enqueue.go @@ -1,46 +1,48 @@ package sqs import ( + "context" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/aws" + sqsv2 "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) -func safeDoOnSendMessage(do func(*sqs.SendMessageInput)) func(m any) { +func safeDoOnSendMessage(do func(*sqsv2.SendMessageInput)) func(m any) { return func(m any) { - if SQSMessage, ok := m.(*sqs.SendMessageInput); ok { + if SQSMessage, ok := m.(*sqsv2.SendMessageInput); ok { do(SQSMessage) } } } -func WithEnqueueDelaySeconds(delay int64) func(m any) { - return safeDoOnSendMessage(func(message *sqs.SendMessageInput) { - message.DelaySeconds = &delay +func WithEnqueueDelaySeconds(delay int32) func(m any) { + return safeDoOnSendMessage(func(message *sqsv2.SendMessageInput) { + message.DelaySeconds = delay }) } -func WithEnqueueMessageAttributes(attrs map[string]*sqs.MessageAttributeValue) func(m any) { - return safeDoOnSendMessage(func(message *sqs.SendMessageInput) { +func WithEnqueueMessageAttributes(attrs map[string]types.MessageAttributeValue) func(m any) { + return safeDoOnSendMessage(func(message *sqsv2.SendMessageInput) { message.MessageAttributes = attrs }) } -func WithEnqueueMessageSystemAttributes(attrs map[string]*sqs.MessageSystemAttributeValue) func(m any) { - return safeDoOnSendMessage(func(message *sqs.SendMessageInput) { +func WithEnqueueMessageSystemAttributes(attrs map[string]types.MessageSystemAttributeValue) func(m any) { + return safeDoOnSendMessage(func(message *sqsv2.SendMessageInput) { message.MessageSystemAttributes = attrs }) } func WithEnqueueMessageDeduplicationId(id string) func(m any) { - return safeDoOnSendMessage(func(message *sqs.SendMessageInput) { + return safeDoOnSendMessage(func(message *sqsv2.SendMessageInput) { message.MessageDeduplicationId = &id }) } func WithEnqueueMessageGroupId(id string) func(m any) { - return safeDoOnSendMessage(func(message *sqs.SendMessageInput) { + return safeDoOnSendMessage(func(message *sqsv2.SendMessageInput) { message.MessageGroupId = &id }) } @@ -50,7 +52,7 @@ func (d *Driver) Enqueue(queue string, data []byte, opts ...func(message any)) e return fmt.Errorf("invalid SQS client") } - req := &sqs.SendMessageInput{ + req := &sqsv2.SendMessageInput{ MessageBody: aws.String(string(data)), QueueUrl: &queue, } @@ -59,7 +61,7 @@ func (d *Driver) Enqueue(queue string, data []byte, opts ...func(message any)) e opt(req) } - _, err := d.sqsClient.SendMessage(req) + _, err := d.sqsClient.SendMessage(context.Background(), req) return err } diff --git a/sqs/sqs_ping.go b/sqs/sqs_ping.go index f2424fa..6d6652a 100644 --- a/sqs/sqs_ping.go +++ b/sqs/sqs_ping.go @@ -1,14 +1,16 @@ package sqs import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" + "context" + + sqsv2 "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) func (d *Driver) Ping() error { - _, err := d.sqsClient.GetQueueAttributes(&sqs.GetQueueAttributesInput{ - AttributeNames: []*string{aws.String("All")}, - QueueUrl: aws.String(d.url), + _, err := d.sqsClient.GetQueueAttributes(context.Background(), &sqsv2.GetQueueAttributesInput{ + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}, + QueueUrl: &d.url, }) return err } diff --git a/sqs/sqs_test.go b/sqs/sqs_test.go index 7adf42f..cf171b9 100644 --- a/sqs/sqs_test.go +++ b/sqs/sqs_test.go @@ -6,8 +6,9 @@ import ( "sync" "testing" - "github.com/aws/aws-sdk-go/aws" - awssqs "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/aws" + sqsv2 "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" @@ -27,7 +28,7 @@ func (suite *SQSTestSuite) SetupTest() { // cleanup environment os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "") os.Setenv("AWS_ACCESS_KEY_ID", "") - os.Setenv("AWS_ACCESS_SECRET_KEY", "") + os.Setenv("AWS_SECRET_ACCESS_KEY", "") suite.ctrl = gomock.NewController(suite.T()) suite.sqsMock = mock_sqs.NewMocksqsClient(suite.ctrl) @@ -41,8 +42,6 @@ func (suite *SQSTestSuite) TearDownTest() { } func (suite *SQSTestSuite) TestNewWIthUrlAndRegionOption() { - os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "test") - _, err := sqs.New( sqs.WithUrl("https://sqs.eu-central-1.amazonaws.com"), sqs.WithRegion("us-east-1"), @@ -59,7 +58,6 @@ func (suite *SQSTestSuite) TestNewWithDefaultOptions() { } func (suite *SQSTestSuite) TestNew_InvalidQueueURL() { - os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "/a/file") _, err := sqs.New( sqs.WithUrl("-"), ) @@ -76,18 +74,19 @@ func (suite *SQSTestSuite) TestNewWithAClient() { } func (suite *SQSTestSuite) TestNewAutoTestConnectionSuccess() { + queueUrl := "aws-sqs-queue-url" suite.sqsMock. EXPECT(). - GetQueueAttributes(&awssqs.GetQueueAttributesInput{ - AttributeNames: []*string{aws.String("All")}, - QueueUrl: aws.String("aws-sqs-queue-url"), + GetQueueAttributes(gomock.Any(), &sqsv2.GetQueueAttributesInput{ + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}, + QueueUrl: &queueUrl, }). - Return(&awssqs.GetQueueAttributesOutput{}, nil) + Return(&sqsv2.GetQueueAttributesOutput{}, nil) sqsDriver, err := sqs.New( sqs.WithClient(suite.sqsMock), sqs.AutoTestConnection(), - sqs.WithUrl("aws-sqs-queue-url"), + sqs.WithUrl(queueUrl), ) suite.Nil(err) @@ -95,18 +94,19 @@ func (suite *SQSTestSuite) TestNewAutoTestConnectionSuccess() { } func (suite *SQSTestSuite) TestNewAutoTestConnectionFail() { + queueUrl := "aws-sqs-queue-url" suite.sqsMock. EXPECT(). - GetQueueAttributes(&awssqs.GetQueueAttributesInput{ - AttributeNames: []*string{aws.String("All")}, - QueueUrl: aws.String("aws-sqs-queue-url"), + GetQueueAttributes(gomock.Any(), &sqsv2.GetQueueAttributesInput{ + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll}, + QueueUrl: &queueUrl, }). Return(nil, errors.New("error calling aws")) sqsDriver, err := sqs.New( sqs.WithClient(suite.sqsMock), sqs.AutoTestConnection(), - sqs.WithUrl("aws-sqs-queue-url"), + sqs.WithUrl(queueUrl), ) suite.NotNil(err) @@ -114,20 +114,21 @@ func (suite *SQSTestSuite) TestNewAutoTestConnectionFail() { } func (suite *SQSTestSuite) TestEnqueueSuccess() { + testQueue := "test-queue" suite.sqsMock.EXPECT(). - SendMessage(&awssqs.SendMessageInput{ + SendMessage(gomock.Any(), &sqsv2.SendMessageInput{ MessageBody: aws.String("test message"), - QueueUrl: aws.String("test-queue"), - DelaySeconds: aws.Int64(1), + QueueUrl: &testQueue, + DelaySeconds: 1, MessageDeduplicationId: aws.String("dedup-id-1"), MessageGroupId: aws.String("group-id-1"), - MessageAttributes: map[string]*awssqs.MessageAttributeValue{ + MessageAttributes: map[string]types.MessageAttributeValue{ "tenant": { DataType: aws.String("String"), StringValue: aws.String("tenant-1"), }, }, - MessageSystemAttributes: map[string]*awssqs.MessageSystemAttributeValue{ + MessageSystemAttributes: map[string]types.MessageSystemAttributeValue{ "request-id": { DataType: aws.String("String"), StringValue: aws.String("12345"), @@ -141,18 +142,18 @@ func (suite *SQSTestSuite) TestEnqueueSuccess() { )) err := sqsDriver.Enqueue( - "test-queue", + testQueue, []byte("test message"), sqs.WithEnqueueDelaySeconds(1), sqs.WithEnqueueMessageGroupId("group-id-1"), sqs.WithEnqueueMessageDeduplicationId("dedup-id-1"), - sqs.WithEnqueueMessageAttributes(map[string]*awssqs.MessageAttributeValue{ + sqs.WithEnqueueMessageAttributes(map[string]types.MessageAttributeValue{ "tenant": { DataType: aws.String("String"), StringValue: aws.String("tenant-1"), }, }), - sqs.WithEnqueueMessageSystemAttributes(map[string]*awssqs.MessageSystemAttributeValue{ + sqs.WithEnqueueMessageSystemAttributes(map[string]types.MessageSystemAttributeValue{ "request-id": { DataType: aws.String("String"), StringValue: aws.String("12345"), @@ -164,18 +165,19 @@ func (suite *SQSTestSuite) TestEnqueueSuccess() { } func (suite *SQSTestSuite) TestConsumeSuccess() { + testQueue := "test-queue" suite.sqsMock.EXPECT(). - ReceiveMessage(&awssqs.ReceiveMessageInput{ - MaxNumberOfMessages: aws.Int64(9), - MessageAttributeNames: []*string{aws.String("All")}, - MessageSystemAttributeNames: []*string{aws.String("All")}, - QueueUrl: aws.String("test-queue"), + ReceiveMessage(gomock.Any(), &sqsv2.ReceiveMessageInput{ + MaxNumberOfMessages: 9, + MessageAttributeNames: []string{"All"}, + MessageSystemAttributeNames: []types.MessageSystemAttributeName{types.MessageSystemAttributeNameAll}, + QueueUrl: &testQueue, ReceiveRequestAttemptId: aws.String("attempt-1"), - VisibilityTimeout: aws.Int64(2), - WaitTimeSeconds: aws.Int64(1), + VisibilityTimeout: 2, + WaitTimeSeconds: 1, }). - Return(&awssqs.ReceiveMessageOutput{ - Messages: []*awssqs.Message{ + Return(&sqsv2.ReceiveMessageOutput{ + Messages: []types.Message{ {Body: aws.String(`{"id": 1}`), ReceiptHandle: aws.String("1")}, {Body: aws.String(`{"id": 2}`), ReceiptHandle: aws.String("1")}, {Body: aws.String(`{"id": 3}`), ReceiptHandle: aws.String("1")}, @@ -187,7 +189,7 @@ func (suite *SQSTestSuite) TestConsumeSuccess() { )) ctrl, err := sqsDriver.Consume( - "test-queue", + testQueue, sqs.WithConsumeWaitTimeSeconds(1), sqs.WithConsumeVisibilityTimeout(2), sqs.WithConsumeRequestAttemptId("attempt-1"), @@ -229,7 +231,7 @@ func (suite *SQSTestSuite) TestConsumeSuccess() { func (suite *SQSTestSuite) TestEnqueueFail() { testQueue := "test-queue" suite.sqsMock.EXPECT(). - SendMessage(&awssqs.SendMessageInput{ + SendMessage(gomock.Any(), &sqsv2.SendMessageInput{ MessageBody: aws.String("test message"), QueueUrl: &testQueue, }).