Skip to content

Commit

Permalink
streaming_sink: use Astra token (#312)
Browse files Browse the repository at this point in the history
Use Astra token directly for streaming sink operations instead of first
exchanging for a Pulsar token.
  • Loading branch information
pgier authored Sep 19, 2023
1 parent a64fbfe commit d3332f1
Showing 1 changed file with 10 additions and 31 deletions.
41 changes: 10 additions & 31 deletions internal/provider/resource_streaming_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
return diag.Errorf("\"deletion_protection\" must be explicitly set to \"false\" in order to destroy astra_streaming_sink")
}

streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3

tenantName := resourceData.Get("tenant_name").(string)
Expand All @@ -142,7 +141,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")

orgResp, err := client.GetCurrentOrganization(ctx)
orgResp, err := astraClient.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("failed to get current organization ID: %v", err)
}
Expand All @@ -152,15 +151,9 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
return diag.Errorf("failed to decode current organization ID: %v", err)
}

token := meta.(astraClients).token
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
if err != nil {
return diag.FromErr(err)
}

deleteSinkParams := astrastreaming.DeleteSinkParams{
XDataStaxPulsarCluster: pulsarCluster,
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
Authorization: meta.(astraClients).token,
}

deleteSinkResponse, err := streamingClientv3.DeleteSinkWithResponse(ctx, tenantName, namespace, sinkName, &deleteSinkParams)
Expand Down Expand Up @@ -208,8 +201,7 @@ type SinkResponse struct {
}

func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3

tenantName := resourceData.Get("tenant_name").(string)
Expand All @@ -223,7 +215,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")

orgBody, err := client.GetCurrentOrganization(ctx)
orgBody, err := astraClient.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("failed to get current organization ID: %v", err)
}
Expand All @@ -233,15 +225,9 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
return diag.Errorf("failed to decode current organization ID: %v", err)
}

token := meta.(astraClients).token
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
if err != nil {
diag.FromErr(err)
}

getSinksParams := astrastreaming.GetSinksParams{
XDataStaxPulsarCluster: pulsarCluster,
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
Authorization: meta.(astraClients).token,
}

getSinkResponse, err := streamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams)
Expand All @@ -262,8 +248,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
}

func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3

rawRegion := resourceData.Get("region").(string)
Expand All @@ -285,7 +270,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
archive = fmt.Sprintf("builtin://%s", sinkName)
}

orgResp, err := client.GetCurrentOrganization(ctx)
orgResp, err := astraClient.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("failed to get current organization ID: %v", err)
}
Expand All @@ -295,7 +280,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
return diag.Errorf("failed to decode current organization ID: %v", err)
}

streamingClustersResponse, err := streamingClient.GetPulsarClustersWithResponse(ctx, org.ID)
streamingClustersResponse, err := streamingClientv3.GetPulsarClustersWithResponse(ctx, org.ID)
if err != nil {
return diag.FromErr(fmt.Errorf("failed to request pulsar clusters: %w", err))
}
Expand All @@ -307,12 +292,6 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")

token := meta.(astraClients).token
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
if err != nil {
diag.FromErr(err)
}

var configs map[string]interface{}
if err := json.Unmarshal([]byte(rawConfigs), &configs); err != nil {
return diag.Errorf("failed to unmarshal sink config: %v", err)
Expand All @@ -321,7 +300,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
createSinkParams := astrastreaming.CreateSinkJSONParams{
XDataStaxPulsarCluster: pulsarCluster,
XDataStaxCurrentOrg: "",
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
Authorization: meta.(astraClients).token,
}

sinkInputs := []string{topic}
Expand Down

0 comments on commit d3332f1

Please sign in to comment.