// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package array import ( "bytes" "errors" "fmt" "strings" "sync/atomic" "github.com/apache/arrow/go/v10/arrow" "github.com/apache/arrow/go/v10/arrow/bitutil" "github.com/apache/arrow/go/v10/arrow/internal/debug" "github.com/apache/arrow/go/v10/arrow/memory" "github.com/goccy/go-json" ) // Struct represents an ordered sequence of relative types. type Struct struct { array fields []arrow.Array } // NewStructArray constructs a new Struct Array out of the columns passed // in and the field names. The length of all cols must be the same and // there should be the same number of columns as names. func NewStructArray(cols []arrow.Array, names []string) (*Struct, error) { return NewStructArrayWithNulls(cols, names, nil, 0, 0) } // NewStructArrayWithNulls is like NewStructArray as a convenience function, // but also takes in a null bitmap, the number of nulls, and an optional offset // to use for creating the Struct Array. func NewStructArrayWithNulls(cols []arrow.Array, names []string, nullBitmap *memory.Buffer, nullCount int, offset int) (*Struct, error) { if len(cols) != len(names) { return nil, fmt.Errorf("%w: mismatching number of fields and child arrays", arrow.ErrInvalid) } if len(cols) == 0 { return nil, fmt.Errorf("%w: can't infer struct array length with 0 child arrays", arrow.ErrInvalid) } length := cols[0].Len() children := make([]arrow.ArrayData, len(cols)) fields := make([]arrow.Field, len(cols)) for i, c := range cols { if length != c.Len() { return nil, fmt.Errorf("%w: mismatching child array lengths", arrow.ErrInvalid) } children[i] = c.Data() fields[i].Name = names[i] fields[i].Type = c.DataType() fields[i].Nullable = true } data := NewData(arrow.StructOf(fields...), length, []*memory.Buffer{nullBitmap}, children, nullCount, offset) defer data.Release() return NewStructData(data), nil } // NewStructData returns a new Struct array value from data. func NewStructData(data arrow.ArrayData) *Struct { a := &Struct{} a.refCount = 1 a.setData(data.(*Data)) return a } func (a *Struct) NumField() int { return len(a.fields) } func (a *Struct) Field(i int) arrow.Array { return a.fields[i] } func (a *Struct) String() string { o := new(strings.Builder) o.WriteString("{") structBitmap := a.NullBitmapBytes() for i, v := range a.fields { if i > 0 { o.WriteString(" ") } if arrow.IsUnion(v.DataType().ID()) { fmt.Fprintf(o, "%v", v) continue } else if !bytes.Equal(structBitmap, v.NullBitmapBytes()) { masked := a.newStructFieldWithParentValidityMask(i) fmt.Fprintf(o, "%v", masked) masked.Release() continue } fmt.Fprintf(o, "%v", v) } o.WriteString("}") return o.String() } // newStructFieldWithParentValidityMask returns the Interface at fieldIndex // with a nullBitmapBytes adjusted according on the parent struct nullBitmapBytes. // From the docs: // "When reading the struct array the parent validity bitmap takes priority." func (a *Struct) newStructFieldWithParentValidityMask(fieldIndex int) arrow.Array { field := a.Field(fieldIndex) nullBitmapBytes := field.NullBitmapBytes() maskedNullBitmapBytes := make([]byte, len(nullBitmapBytes)) copy(maskedNullBitmapBytes, nullBitmapBytes) for i := 0; i < field.Len(); i++ { if !a.IsValid(i) { bitutil.ClearBit(maskedNullBitmapBytes, i) } } data := NewSliceData(field.Data(), 0, int64(field.Len())).(*Data) defer data.Release() bufs := make([]*memory.Buffer, len(data.Buffers())) copy(bufs, data.buffers) bufs[0].Release() bufs[0] = memory.NewBufferBytes(maskedNullBitmapBytes) data.buffers = bufs maskedField := MakeFromData(data) return maskedField } func (a *Struct) setData(data *Data) { a.array.setData(data) a.fields = make([]arrow.Array, len(data.childData)) for i, child := range data.childData { if data.offset != 0 || child.Len() != data.length { sub := NewSliceData(child, int64(data.offset), int64(data.offset+data.length)) a.fields[i] = MakeFromData(sub) sub.Release() } else { a.fields[i] = MakeFromData(child) } } } func (a *Struct) getOneForMarshal(i int) interface{} { if a.IsNull(i) { return nil } tmp := make(map[string]interface{}) fieldList := a.data.dtype.(*arrow.StructType).Fields() for j, d := range a.fields { tmp[fieldList[j].Name] = d.(arraymarshal).getOneForMarshal(i) } return tmp } func (a *Struct) MarshalJSON() ([]byte, error) { var buf bytes.Buffer enc := json.NewEncoder(&buf) buf.WriteByte('[') for i := 0; i < a.Len(); i++ { if i != 0 { buf.WriteByte(',') } if err := enc.Encode(a.getOneForMarshal(i)); err != nil { return nil, err } } buf.WriteByte(']') return buf.Bytes(), nil } func arrayEqualStruct(left, right *Struct) bool { for i, lf := range left.fields { rf := right.fields[i] if !Equal(lf, rf) { return false } } return true } func (a *Struct) Retain() { a.array.Retain() for _, f := range a.fields { f.Retain() } } func (a *Struct) Release() { a.array.Release() for _, f := range a.fields { f.Release() } } type StructBuilder struct { builder dtype arrow.DataType fields []Builder } // NewStructBuilder returns a builder, using the provided memory allocator. func NewStructBuilder(mem memory.Allocator, dtype *arrow.StructType) *StructBuilder { b := &StructBuilder{ builder: builder{refCount: 1, mem: mem}, dtype: dtype, fields: make([]Builder, len(dtype.Fields())), } for i, f := range dtype.Fields() { b.fields[i] = NewBuilder(b.mem, f.Type) } return b } func (b *StructBuilder) Type() arrow.DataType { fields := make([]arrow.Field, len(b.fields)) copy(fields, b.dtype.(*arrow.StructType).Fields()) for i, b := range b.fields { fields[i].Type = b.Type() } return arrow.StructOf(fields...) } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. func (b *StructBuilder) Release() { debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases") if atomic.AddInt64(&b.refCount, -1) == 0 { if b.nullBitmap != nil { b.nullBitmap.Release() b.nullBitmap = nil } for _, f := range b.fields { f.Release() } } } func (b *StructBuilder) Append(v bool) { b.Reserve(1) b.unsafeAppendBoolToBitmap(v) if !v { for _, f := range b.fields { f.AppendNull() } } } func (b *StructBuilder) AppendValues(valids []bool) { b.Reserve(len(valids)) b.builder.unsafeAppendBoolsToBitmap(valids, len(valids)) } func (b *StructBuilder) AppendNull() { b.Append(false) } func (b *StructBuilder) AppendEmptyValue() { b.Append(true) for _, f := range b.fields { f.AppendEmptyValue() } } func (b *StructBuilder) unsafeAppendBoolToBitmap(isValid bool) { if isValid { bitutil.SetBit(b.nullBitmap.Bytes(), b.length) } else { b.nulls++ } b.length++ } func (b *StructBuilder) init(capacity int) { b.builder.init(capacity) } // Reserve ensures there is enough space for appending n elements // by checking the capacity and calling Resize if necessary. func (b *StructBuilder) Reserve(n int) { b.builder.reserve(n, b.resizeHelper) for _, f := range b.fields { f.Reserve(n) } } // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), // additional memory will be allocated. If n is smaller, the allocated memory may reduced. func (b *StructBuilder) Resize(n int) { b.resizeHelper(n) for _, f := range b.fields { f.Resize(n) } } func (b *StructBuilder) resizeHelper(n int) { if n < minBuilderCapacity { n = minBuilderCapacity } if b.capacity == 0 { b.init(n) } else { b.builder.resize(n, b.builder.init) } } func (b *StructBuilder) NumField() int { return len(b.fields) } func (b *StructBuilder) FieldBuilder(i int) Builder { return b.fields[i] } // NewArray creates a Struct array from the memory buffers used by the builder and resets the StructBuilder // so it can be used to build a new array. func (b *StructBuilder) NewArray() arrow.Array { return b.NewStructArray() } // NewStructArray creates a Struct array from the memory buffers used by the builder and resets the StructBuilder // so it can be used to build a new array. func (b *StructBuilder) NewStructArray() (a *Struct) { data := b.newData() a = NewStructData(data) data.Release() return } func (b *StructBuilder) newData() (data *Data) { fields := make([]arrow.ArrayData, len(b.fields)) for i, f := range b.fields { arr := f.NewArray() defer arr.Release() fields[i] = arr.Data() } data = NewData( b.Type(), b.length, []*memory.Buffer{ b.nullBitmap, }, fields, b.nulls, 0, ) b.reset() return } func (b *StructBuilder) unmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err } switch t { case json.Delim('{'): b.Append(true) keylist := make(map[string]bool) for dec.More() { keyTok, err := dec.Token() if err != nil { return err } key, ok := keyTok.(string) if !ok { return errors.New("missing key") } if keylist[key] { return fmt.Errorf("key %s is specified twice", key) } keylist[key] = true idx, ok := b.dtype.(*arrow.StructType).FieldIdx(key) if !ok { continue } if err := b.fields[idx].unmarshalOne(dec); err != nil { return err } } // Append null values to all optional fields that were not presented in the json input for _, field := range b.dtype.(*arrow.StructType).Fields() { if !field.Nullable { continue } idx, _ := b.dtype.(*arrow.StructType).FieldIdx(field.Name) if _, hasKey := keylist[field.Name]; !hasKey { b.fields[idx].AppendNull() } } // consume '}' _, err := dec.Token() return err case nil: b.AppendNull() default: return &json.UnmarshalTypeError{ Offset: dec.InputOffset(), Struct: fmt.Sprint(b.dtype), } } return nil } func (b *StructBuilder) unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.unmarshalOne(dec); err != nil { return err } } return nil } func (b *StructBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '[' { return fmt.Errorf("struct builder must unpack from json array, found %s", delim) } return b.unmarshal(dec) } var ( _ arrow.Array = (*Struct)(nil) _ Builder = (*StructBuilder)(nil) )